aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-10 09:44:32 +0800
committerGitHub <noreply@github.com>2020-07-10 09:44:32 +0800
commite67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch)
tree4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/command
parent2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff)
parent1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff)
downloadseaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz
seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go108
-rw-r--r--weed/command/benchmark.go77
-rw-r--r--weed/command/command.go18
-rw-r--r--weed/command/compact.go12
-rw-r--r--weed/command/download.go1
-rw-r--r--weed/command/export.go58
-rw-r--r--weed/command/filer.go60
-rw-r--r--weed/command/filer_copy.go449
-rw-r--r--weed/command/filer_export.go187
-rw-r--r--weed/command/filer_replication.go30
-rw-r--r--weed/command/fix.go40
-rw-r--r--weed/command/master.go193
-rw-r--r--weed/command/mount.go67
-rw-r--r--weed/command/mount_darwin.go13
-rw-r--r--weed/command/mount_freebsd.go13
-rw-r--r--weed/command/mount_linux.go155
-rw-r--r--weed/command/mount_notsupported.go1
-rw-r--r--weed/command/mount_std.go189
-rw-r--r--weed/command/msg_broker.go114
-rw-r--r--weed/command/s3.go151
-rw-r--r--weed/command/scaffold.go225
-rw-r--r--weed/command/scaffold_test.go44
-rw-r--r--weed/command/server.go215
-rw-r--r--weed/command/shell.go66
-rw-r--r--weed/command/upload.go38
-rw-r--r--weed/command/version.go2
-rw-r--r--weed/command/volume.go248
-rw-r--r--weed/command/watch.go65
-rw-r--r--weed/command/webdav.go142
29 files changed, 2064 insertions, 917 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 072aea75b..615be80cf 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -3,6 +3,11 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
)
@@ -12,10 +17,12 @@ var (
)
type BackupOptions struct {
- master *string
- collection *string
- dir *string
- volumeId *int
+ master *string
+ collection *string
+ dir *string
+ volumeId *int
+ ttl *string
+ replication *string
}
func init() {
@@ -24,32 +31,45 @@ func init() {
s.collection = cmdBackup.Flag.String("collection", "", "collection name")
s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files")
s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
+ s.ttl = cmdBackup.Flag.String("ttl", "", `backup volume's time to live, format:
+ 3m: 3 minutes
+ 4h: 4 hours
+ 5d: 5 days
+ 6w: 6 weeks
+ 7M: 7 months
+ 8y: 8 years
+ default is the same with origin`)
+ s.replication = cmdBackup.Flag.String("replication", "", "backup volume's replication, default is the same with origin")
}
var cmdBackup = &Command{
UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333",
Short: "incrementally backup a volume to local folder",
Long: `Incrementally backup volume data.
-
+
It is expected that you use this inside a script, to loop through
all possible volume ids that needs to be backup to local folder.
-
+
The volume id does not need to exist locally or even remotely.
This will help to backup future new volumes.
-
+
Usually backing up is just copying the .dat (and .idx) files.
But it's tricky to incrementally copy the differences.
-
+
The complexity comes when there are multiple addition, deletion and compaction.
- This tool will handle them correctly and efficiently, avoiding unnecessary data transporation.
+ This tool will handle them correctly and efficiently, avoiding unnecessary data transportation.
`,
}
func runBackup(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
if *s.volumeId == -1 {
return false
}
- vid := storage.VolumeId(*s.volumeId)
+ vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
lookup, err := operation.Lookup(*s.master, vid.String())
@@ -59,29 +79,73 @@ func runBackup(cmd *Command, args []string) bool {
}
volumeServer := lookup.Locations[0].Url
- stats, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid))
+ stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
if err != nil {
fmt.Printf("Error get volume %d status: %v\n", vid, err)
return true
}
- ttl, err := storage.ReadTTL(stats.Ttl)
- if err != nil {
- fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
- return true
+ var ttl *needle.TTL
+ if *s.ttl != "" {
+ ttl, err = needle.ReadTTL(*s.ttl)
+ if err != nil {
+ fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err)
+ return true
+ }
+ } else {
+ ttl, err = needle.ReadTTL(stats.Ttl)
+ if err != nil {
+ fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
+ return true
+ }
}
- replication, err := storage.NewReplicaPlacementFromString(stats.Replication)
- if err != nil {
- fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
- return true
+ var replication *super_block.ReplicaPlacement
+ if *s.replication != "" {
+ replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
+ if err != nil {
+ fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
+ return true
+ }
+ } else {
+ replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
+ if err != nil {
+ fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
+ return true
+ }
}
-
- v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0)
+ v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
}
- if err := v.Synchronize(volumeServer); err != nil {
+ if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
+ if err = v.Compact2(30*1024*1024*1024, 0); err != nil {
+ fmt.Printf("Compact Volume before synchronizing %v\n", err)
+ return true
+ }
+ if err = v.CommitCompact(); err != nil {
+ fmt.Printf("Commit Compact before synchronizing %v\n", err)
+ return true
+ }
+ v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
+ v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
+ }
+
+ datSize, _, _ := v.FileStat()
+
+ if datSize > stats.TailOffset {
+ // remove the old data
+ v.Destroy()
+ // recreate an empty volume
+ v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
+ if err != nil {
+ fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
+ return true
+ }
+ }
+ defer v.Close()
+
+ if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil {
fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
return true
}
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 60fd88ccd..4a9a9619a 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -2,7 +2,6 @@ package command
import (
"bufio"
- "context"
"fmt"
"io"
"math"
@@ -15,6 +14,8 @@ import (
"sync"
"time"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -33,15 +34,18 @@ type BenchmarkOptions struct {
read *bool
sequentialRead *bool
collection *string
+ replication *string
cpuprofile *string
maxCpu *int
- secretKey *string
+ grpcDialOption grpc.DialOption
+ masterClient *wdclient.MasterClient
+ fsync *bool
}
var (
- b BenchmarkOptions
- sharedBytes []byte
- masterClient *wdclient.MasterClient
+ b BenchmarkOptions
+ sharedBytes []byte
+ isSecure bool
)
func init() {
@@ -57,14 +61,15 @@ func init() {
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
+ b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
+ b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
sharedBytes = make([]byte, 1024)
}
var cmdBenchmark = &Command{
- UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
+ UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
Short: "benchmark on writing millions of files and read out",
Long: `benchmark on an empty SeaweedFS file system.
@@ -102,7 +107,11 @@ var (
)
func runBenchmark(cmd *Command, args []string) bool {
- fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+
+ util.LoadConfiguration("security", false)
+ b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
*b.maxCpu = runtime.NumCPU()
}
@@ -116,9 +125,9 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- masterClient = wdclient.NewMasterClient(context.Background(), "benchmark", strings.Split(*b.masters, ","))
- go masterClient.KeepConnectedToMaster()
- masterClient.WaitUntilConnected()
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ","))
+ go b.masterClient.KeepConnectedToMaster()
+ b.masterClient.WaitUntilConnected()
if *b.write {
benchWrite()
@@ -188,7 +197,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
defer wait.Done()
delayedDeleteChan := make(chan *delayedFile, 100)
var waitForDeletions sync.WaitGroup
- secret := security.Secret(*b.secretKey)
for i := 0; i < 7; i++ {
waitForDeletions.Add(1)
@@ -198,8 +206,11 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now()))
}
- if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid,
- security.GenJwt(secret, df.fp.Fid)); e == nil {
+ var jwtAuthorization security.EncodedJwt
+ if isSecure {
+ jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid)
+ }
+ if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
s.completed++
} else {
s.failed++
@@ -214,17 +225,22 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
fp := &operation.FilePart{
- Reader: &FakeReader{id: uint64(id), size: fileSize},
+ Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
FileSize: fileSize,
MimeType: "image/bench", // prevent gzip benchmark content
+ Fsync: *b.fsync,
}
ar := &operation.VolumeAssignRequest{
- Count: 1,
- Collection: *b.collection,
+ Count: 1,
+ Collection: *b.collection,
+ Replication: *b.replication,
}
- if assignResult, err := operation.Assign(masterClient.GetMaster(), ar); err == nil {
+ if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
- if _, err := fp.Upload(0, masterClient.GetMaster(), secret); err == nil {
+ if !isSecure && assignResult.Auth != "" {
+ isSecure = true
+ }
+ if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@@ -264,19 +280,24 @@ func readFiles(fileIdLineChan chan string, s *stat) {
fmt.Printf("reading file %s\n", fid)
}
start := time.Now()
- url, err := masterClient.LookupFileId(fid)
+ var bytesRead int
+ var err error
+ url, err := b.masterClient.LookupFileId(fid)
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
continue
}
- if bytesRead, err := util.Get(url); err == nil {
+ var bytes []byte
+ bytes, err = util.Get(url)
+ bytesRead = len(bytes)
+ if err == nil {
s.completed++
- s.transferred += int64(len(bytesRead))
+ s.transferred += int64(bytesRead)
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- fmt.Printf("Failed to read %s error:%v\n", url, err)
+ fmt.Printf("Failed to read %s error:%v\n", fid, err)
}
}
}
@@ -338,7 +359,7 @@ func readFileIds(fileName string, fileIdLineChan chan string) {
}
const (
- benchResolution = 10000 //0.1 microsecond
+ benchResolution = 10000 // 0.1 microsecond
benchBucket = 1000000000 / benchResolution
)
@@ -461,7 +482,7 @@ func (s *stats) printStats() {
fmt.Printf("\nConnection Times (ms)\n")
fmt.Printf(" min avg max std\n")
fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
- //printing percentiles
+ // printing percentiles
fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
percentiles := make([]int, len(percentages))
for i := 0; i < len(percentages); i++ {
@@ -495,8 +516,9 @@ func (s *stats) printStats() {
// a fake reader to generate content to upload
type FakeReader struct {
- id uint64 // an id number
- size int64 // max bytes
+ id uint64 // an id number
+ size int64 // max bytes
+ random *rand.Rand
}
func (l *FakeReader) Read(p []byte) (n int, err error) {
@@ -512,6 +534,7 @@ func (l *FakeReader) Read(p []byte) (n int, err error) {
for i := 0; i < 8; i++ {
p[i] = byte(l.id >> uint(i*8))
}
+ l.random.Read(p[8:])
}
l.size -= int64(n)
return
diff --git a/weed/command/command.go b/weed/command/command.go
index 91b9bf3fc..9a41a8a7c 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -12,21 +12,23 @@ var Commands = []*Command{
cmdBackup,
cmdCompact,
cmdCopy,
- cmdFix,
- cmdFilerExport,
+ cmdDownload,
+ cmdExport,
+ cmdFiler,
cmdFilerReplicate,
- cmdServer,
+ cmdFix,
cmdMaster,
- cmdFiler,
+ cmdMount,
cmdS3,
- cmdUpload,
- cmdDownload,
+ cmdMsgBroker,
cmdScaffold,
+ cmdServer,
cmdShell,
+ cmdWatch,
+ cmdUpload,
cmdVersion,
cmdVolume,
- cmdExport,
- cmdMount,
+ cmdWebDav,
}
type Command struct {
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 0dd4efe0e..4e28aa725 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -3,6 +3,7 @@ package command
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func init() {
@@ -16,6 +17,9 @@ var cmdCompact = &Command{
The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file.
+ For method=0, it compacts based on the .dat file, works if .idx file is corrupted.
+ For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files.
+
`,
}
@@ -35,18 +39,18 @@ func runCompact(cmd *Command, args []string) bool {
preallocate := *compactVolumePreallocate * (1 << 20)
- vid := storage.VolumeId(*compactVolumeId)
+ vid := needle.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
- storage.NeedleMapInMemory, nil, nil, preallocate)
+ storage.NeedleMapInMemory, nil, nil, preallocate, 0)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
if *compactMethod == 0 {
- if err = v.Compact(preallocate); err != nil {
+ if err = v.Compact(preallocate, 0); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
- if err = v.Compact2(); err != nil {
+ if err = v.Compact2(preallocate, 0); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
}
diff --git a/weed/command/download.go b/weed/command/download.go
index b3e33defd..be0eb47e5 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -71,6 +71,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
+ io.Copy(ioutil.Discard, rc)
return err
}
defer f.Close()
diff --git a/weed/command/export.go b/weed/command/export.go
index 5c7e064ce..5d304b5a0 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"fmt"
+ "io"
"os"
"path"
"path/filepath"
@@ -14,8 +15,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "io"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -66,17 +70,17 @@ var (
localLocation, _ = time.LoadLocation("Local")
)
-func printNeedle(vid storage.VolumeId, n *storage.Needle, version storage.Version, deleted bool) {
- key := storage.NewFileIdFromNeedle(vid, n).String()
+func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, deleted bool) {
+ key := needle.NewFileIdFromNeedle(vid, n).String()
size := n.DataSize
- if version == storage.Version1 {
+ if version == needle.Version1 {
size = n.Size
}
fmt.Printf("%s\t%s\t%d\t%t\t%s\t%s\t%s\t%t\n",
key,
n.Name,
size,
- n.IsGzipped(),
+ n.IsCompressed(),
n.Mime,
n.LastModifiedString(),
n.Ttl.String(),
@@ -85,14 +89,14 @@ func printNeedle(vid storage.VolumeId, n *storage.Needle, version storage.Versio
}
type VolumeFileScanner4Export struct {
- version storage.Version
+ version needle.Version
counter int
- needleMap *storage.NeedleMap
- vid storage.VolumeId
+ needleMap *needle_map.MemDb
+ vid needle.VolumeId
}
-func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -100,14 +104,14 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool {
return true
}
-func (scanner *VolumeFileScanner4Export) VisitNeedle(n *storage.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
needleMap := scanner.needleMap
vid := scanner.vid
nv, ok := needleMap.Get(n.Id)
- glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v",
- n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped(), ok, nv)
- if ok && nv.Size > 0 && int64(nv.Offset)*types.NeedlePaddingSize == offset {
+ glog.V(3).Infof("key %d offset %d size %d disk_size %d compressed %v ok %v nv %+v",
+ n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed(), ok, nv)
+ if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && nv.Offset.ToAcutalOffset() == offset {
if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) {
glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d",
n.LastModified, newerThanUnix)
@@ -189,16 +193,13 @@ func runExport(cmd *Command, args []string) bool {
if *export.collection != "" {
fileName = *export.collection + "_" + fileName
}
- vid := storage.VolumeId(*export.volumeId)
- indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
+ vid := needle.VolumeId(*export.volumeId)
- needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
- if err != nil {
- glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
+ needleMap := needle_map.NewMemDb()
+ defer needleMap.Close()
+
+ if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
+ glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
volumeFileScanner := &VolumeFileScanner4Export{
@@ -225,8 +226,8 @@ type nameParams struct {
Ext string
}
-func writeFile(vid storage.VolumeId, n *storage.Needle) (err error) {
- key := storage.NewFileIdFromNeedle(vid, n).String()
+func writeFile(vid needle.VolumeId, n *needle.Needle) (err error) {
+ key := needle.NewFileIdFromNeedle(vid, n).String()
fileNameTemplateBuffer.Reset()
if err = fileNameTemplate.Execute(fileNameTemplateBuffer,
nameParams{
@@ -242,8 +243,11 @@ func writeFile(vid storage.VolumeId, n *storage.Needle) (err error) {
fileName := fileNameTemplateBuffer.String()
- if n.IsGzipped() && path.Ext(fileName) != ".gz" {
- fileName = fileName + ".gz"
+ if n.IsCompressed() {
+ if util.IsGzippedContent(n.Data) && path.Ext(fileName) != ".gz" {
+ fileName = fileName + ".gz"
+ }
+ // TODO other compression method
}
tarHeader.Name, tarHeader.Size = fileName, int64(len(n.Data))
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 0c1950f96..b52b01149 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -6,11 +6,14 @@ import (
"strings"
"time"
+ "google.golang.org/grpc/reflection"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc/reflection"
)
var (
@@ -20,18 +23,19 @@ var (
type FilerOptions struct {
masters *string
ip *string
+ bindIp *string
port *int
- grpcPort *int
publicPort *int
collection *string
defaultReplicaPlacement *string
- redirectOnRead *bool
disableDirListing *bool
maxMB *int
- secretKey *string
dirListingLimit *int
dataCenter *string
enableNotification *bool
+ disableHttp *bool
+ cipher *bool
+ peers *string
// default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
@@ -41,17 +45,18 @@ func init() {
cmdFiler.Run = runFiler // break init cycle
f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
- f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
+ f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
+ f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
- f.grpcPort = cmdFiler.Flag.Int("port.grpc", 0, "filer grpc server listen port, default to http port + 10000")
- f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public")
+ f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
- f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
- f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
+ f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
+ f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
+ f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
}
var cmdFiler = &Command{
@@ -70,13 +75,15 @@ var cmdFiler = &Command{
The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order.
- The example filer.toml configuration file can be generated by "weed scaffold filer"
+ The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
`,
}
func runFiler(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
+
f.startFiler()
return true
@@ -91,30 +98,38 @@ func (fo *FilerOptions) startFiler() {
publicVolumeMux = http.NewServeMux()
}
- defaultLevelDbDirectory := "./filerdb"
+ defaultLevelDbDirectory := "./filerldb2"
if fo.defaultLevelDbDirectory != nil {
- defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerdb"
+ defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerldb2"
+ }
+
+ var peers []string
+ if *fo.peers != "" {
+ peers = strings.Split(*fo.peers, ",")
}
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
- Masters: strings.Split(*f.masters, ","),
+ Masters: strings.Split(*fo.masters, ","),
Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement,
- RedirectOnRead: *fo.redirectOnRead,
DisableDirListing: *fo.disableDirListing,
MaxMB: *fo.maxMB,
- SecretKey: *fo.secretKey,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
DefaultLevelDbDir: defaultLevelDbDirectory,
+ DisableHttp: *fo.disableHttp,
+ Host: *fo.ip,
+ Port: uint32(*fo.port),
+ Cipher: *fo.cipher,
+ Filers: peers,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
}
if *fo.publicPort != 0 {
- publicListeningAddress := *fo.ip + ":" + strconv.Itoa(*fo.publicPort)
- glog.V(0).Infoln("Start Seaweed filer server", util.VERSION, "public at", publicListeningAddress)
+ publicListeningAddress := *fo.bindIp + ":" + strconv.Itoa(*fo.publicPort)
+ glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, 0)
if e != nil {
glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e)
@@ -126,9 +141,9 @@ func (fo *FilerOptions) startFiler() {
}()
}
- glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.VERSION, *fo.ip, *fo.port)
+ glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port)
filerListener, e := util.NewListener(
- ":"+strconv.Itoa(*fo.port),
+ *fo.bindIp+":"+strconv.Itoa(*fo.port),
time.Duration(10)*time.Second,
)
if e != nil {
@@ -136,15 +151,12 @@ func (fo *FilerOptions) startFiler() {
}
// starting grpc server
- grpcPort := *fo.grpcPort
- if grpcPort == 0 {
- grpcPort = *fo.port + 10000
- }
+ grpcPort := *fo.port + 10000
grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer()
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 3638bcb27..2d6ba94d6 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -1,52 +1,62 @@
package command
import (
+ "context"
"fmt"
+ "io"
"io/ioutil"
+ "net/http"
"net/url"
"os"
"path/filepath"
+ "strconv"
"strings"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
- "context"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
- "io"
- "net/http"
- "strconv"
- "time"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
)
var (
- copy CopyOptions
+ copy CopyOptions
+ waitGroup sync.WaitGroup
)
type CopyOptions struct {
- filerGrpcPort *int
- master *string
- include *string
- replication *string
- collection *string
- ttl *string
- maxMB *int
- secretKey *string
-
- secret security.Secret
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ maxMB *int
+ masterClient *wdclient.MasterClient
+ concurrenctFiles *int
+ concurrenctChunks *int
+ grpcDialOption grpc.DialOption
+ masters []string
+ cipher bool
+ ttlSec int32
}
func init() {
cmdCopy.Run = runCopy // break init cycle
cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
- copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
- copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
- copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
+ copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
+ copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
}
var cmdCopy = &Command{
@@ -66,7 +76,9 @@ var cmdCopy = &Command{
}
func runCopy(cmd *Command, args []string) bool {
- copy.secret = security.Secret(*copy.secretKey)
+
+ util.LoadConfiguration("security", false)
+
if len(args) <= 1 {
return false
}
@@ -96,221 +108,380 @@ func runCopy(cmd *Command, args []string) bool {
}
filerGrpcPort := filerPort + 10000
- if *copy.filerGrpcPort != 0 {
- filerGrpcPort = uint64(*copy.filerGrpcPort)
+ filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
+ copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
+ if err != nil {
+ fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
+ return false
+ }
+ if *copy.collection == "" {
+ *copy.collection = collection
+ }
+ if *copy.replication == "" {
+ *copy.replication = replication
}
+ if *copy.maxMB == 0 {
+ *copy.maxMB = int(maxMB)
+ }
+ copy.masters = masters
+ copy.cipher = cipher
- filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
+ ttl, err := needle.ReadTTL(*copy.ttl)
+ if err != nil {
+ fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err)
+ return false
+ }
+ copy.ttlSec = int32(ttl.Minutes()) * 60
+
+ if *cmdCopy.IsDebug {
+ grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
+ }
+
+ fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
- for _, fileOrDir := range fileOrDirs {
- if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
- return false
+ go func() {
+ defer close(fileCopyTaskChan)
+ for _, fileOrDir := range fileOrDirs {
+ if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
+ fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
+ break
+ }
}
+ }()
+ for i := 0; i < *copy.concurrenctFiles; i++ {
+ waitGroup.Add(1)
+ go func() {
+ defer waitGroup.Done()
+ worker := FileCopyWorker{
+ options: &copy,
+ filerHost: filerUrl.Host,
+ filerGrpcAddress: filerGrpcAddress,
+ }
+ if err := worker.copyFiles(fileCopyTaskChan); err != nil {
+ fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
+ return
+ }
+ }()
}
+ waitGroup.Wait()
+
return true
}
-func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
- f, err := os.Open(fileOrDir)
- if err != nil {
- fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
- return false
- }
- defer f.Close()
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
+ cipher = resp.Cipher
+ return nil
+ })
+ return
+}
+
+func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
- fi, err := f.Stat()
+ fi, err := os.Stat(fileOrDir)
if err != nil {
- fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
- return false
+ fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
+ return nil
}
mode := fi.Mode()
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
for _, subFileOrDir := range files {
- if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
- return false
+ if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
+ return err
}
}
- return true
+ return nil
}
+ uid, gid := util.GetFileUidGid(fi)
+
+ fileCopyTaskChan <- FileCopyTask{
+ sourceLocation: fileOrDir,
+ destinationUrlPath: destPath,
+ fileSize: fi.Size(),
+ fileMode: fi.Mode(),
+ uid: uid,
+ gid: gid,
+ }
+
+ return nil
+}
+
+type FileCopyWorker struct {
+ options *CopyOptions
+ filerHost string
+ filerGrpcAddress string
+}
+
+func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
+ for task := range fileCopyTaskChan {
+ if err := worker.doEachCopy(task); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type FileCopyTask struct {
+ sourceLocation string
+ destinationUrlPath string
+ fileSize int64
+ fileMode os.FileMode
+ uid uint32
+ gid uint32
+}
+
+func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
+
+ f, err := os.Open(task.sourceLocation)
+ if err != nil {
+ fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
+ if _, ok := err.(*os.PathError); ok {
+ fmt.Printf("skipping %s\n", task.sourceLocation)
+ return nil
+ }
+ return err
+ }
+ defer f.Close()
+
// this is a regular file
- if *copy.include != "" {
- if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
- return true
+ if *worker.options.include != "" {
+ if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
+ return nil
}
}
// find the chunk count
- chunkSize := int64(*copy.maxMB * 1024 * 1024)
+ chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
chunkCount := 1
- if chunkSize > 0 && fi.Size() > chunkSize {
- chunkCount = int(fi.Size()/chunkSize) + 1
+ if chunkSize > 0 && task.fileSize > chunkSize {
+ chunkCount = int(task.fileSize/chunkSize) + 1
}
if chunkCount == 1 {
- return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
+ return worker.uploadFileAsOne(task, f)
}
- return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
+ return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
-func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
+func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
+ data, err := ioutil.ReadAll(f)
+ if err != nil {
+ return err
+ }
var chunks []*filer_pb.FileChunk
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
- if fi.Size() > 0 {
+ if task.fileSize > 0 {
// assign a volume
- assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *copy.replication,
- Collection: *copy.collection,
- Ttl: *copy.ttl,
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
- fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
+ return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
- uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, "")
+ uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
- fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- return false
+ return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
if uploadResult.Error != "" {
- fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- return false
+ return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: 0,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- })
+ chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
- Directory: urlFolder,
+ Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
- Gid: uint32(os.Getgid()),
- Uid: uint32(os.Getuid()),
- FileSize: uint64(fi.Size()),
- FileMode: uint32(fi.Mode()),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *copy.replication,
- Collection: *copy.collection,
- TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
- return false
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
- return true
+ return nil
}
-func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
+func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
- var chunks []*filer_pb.FileChunk
+ chunksChan := make(chan *filer_pb.FileChunk, chunkCount)
+
+ concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
+ var wg sync.WaitGroup
+ var uploadError error
+ var collection, replication string
+
+ fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
+ for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
+ wg.Add(1)
+ concurrentChunks <- struct{}{}
+ go func(i int64) {
+ defer func() {
+ wg.Done()
+ <-concurrentChunks
+ }()
+ // assign a volume
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- for i := int64(0); i < int64(chunkCount); i++ {
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ if collection == "" {
+ collection = assignResult.Collection
+ }
+ if replication == "" {
+ replication = assignResult.Replication
+ }
- // assign a volume
- assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *copy.replication,
- Collection: *copy.collection,
- Ttl: *copy.ttl,
- })
- if err != nil {
- fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
- }
+ uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
+ if err != nil {
+ uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ return
+ }
+ if uploadResult.Error != "" {
+ uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ return
+ }
+ chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize)
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ }(i)
+ }
+ wg.Wait()
+ close(chunksChan)
- uploadResult, err := operation.Upload(targetUrl,
- fileName+"-"+strconv.FormatInt(i+1, 10),
- io.LimitReader(f, chunkSize),
- false, "application/octet-stream", nil, "")
- if err != nil {
- fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- return false
- }
- if uploadResult.Error != "" {
- fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- return false
+ var chunks []*filer_pb.FileChunk
+ for chunk := range chunksChan {
+ chunks = append(chunks, chunk)
+ }
+
+ if uploadError != nil {
+ var fileIds []string
+ for _, chunk := range chunks {
+ fileIds = append(fileIds, chunk.FileId)
}
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: i * chunkSize,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- })
- fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds)
+ return uploadError
}
- if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
- Directory: urlFolder,
+ Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
- Gid: uint32(os.Getgid()),
- Uid: uint32(os.Getuid()),
- FileSize: uint64(fi.Size()),
- FileMode: uint32(fi.Mode()),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *copy.replication,
- Collection: *copy.collection,
- TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
+ Replication: replication,
+ Collection: collection,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
- return false
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
- return true
+ return nil
}
func detectMimeType(f *os.File) string {
@@ -322,22 +493,12 @@ func detectMimeType(f *os.File) string {
}
if err != nil {
fmt.Printf("read head of %v: %v\n", f.Name(), err)
- return "application/octet-stream"
+ return ""
}
f.Seek(0, io.SeekStart)
mimeType := http.DetectContentType(head[:n])
- return mimeType
-}
-
-func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- grpcConnection, err := util.GrpcDial(filerAddress)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
+ if mimeType == "application/octet-stream" {
+ return ""
}
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
-
- return fn(client)
+ return mimeType
}
diff --git a/weed/command/filer_export.go b/weed/command/filer_export.go
deleted file mode 100644
index 7a2e7920a..000000000
--- a/weed/command/filer_export.go
+++ /dev/null
@@ -1,187 +0,0 @@
-package command
-
-import (
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/notification"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/spf13/viper"
-)
-
-func init() {
- cmdFilerExport.Run = runFilerExport // break init cycle
-}
-
-var cmdFilerExport = &Command{
- UsageLine: "filer.export -sourceStore=mysql -targetStore=cassandra",
- Short: "export meta data in filer store",
- Long: `Iterate the file tree and export all metadata out
-
- Both source and target store:
- * should be a store name already specified in filer.toml
- * do not need to be enabled state
-
- If target store is empty, only the directory tree will be listed.
-
- If target store is "notification", the list of entries will be sent to notification.
- This is usually used to bootstrap filer replication to a remote system.
-
- `,
-}
-
-var (
- // filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree")
- filerExportSourceStore = cmdFilerExport.Flag.String("sourceStore", "", "the source store name in filer.toml, default to currently enabled store")
- filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml, or \"notification\" to export all files to message queue")
- dir = cmdFilerExport.Flag.String("dir", "/", "only process files under this directory")
- dirListLimit = cmdFilerExport.Flag.Int("dirListLimit", 100000, "limit directory list size")
- dryRun = cmdFilerExport.Flag.Bool("dryRun", false, "not actually moving data")
- verboseFilerExport = cmdFilerExport.Flag.Bool("v", false, "verbose entry details")
-)
-
-type statistics struct {
- directoryCount int
- fileCount int
-}
-
-func runFilerExport(cmd *Command, args []string) bool {
-
- weed_server.LoadConfiguration("filer", true)
- config := viper.GetViper()
-
- var sourceStore, targetStore filer2.FilerStore
-
- for _, store := range filer2.Stores {
- if store.GetName() == *filerExportSourceStore || *filerExportSourceStore == "" && config.GetBool(store.GetName()+".enabled") {
- viperSub := config.Sub(store.GetName())
- if err := store.Initialize(viperSub); err != nil {
- glog.Fatalf("Failed to initialize source store for %s: %+v",
- store.GetName(), err)
- } else {
- sourceStore = store
- }
- break
- }
- }
-
- for _, store := range filer2.Stores {
- if store.GetName() == *filerExportTargetStore {
- viperSub := config.Sub(store.GetName())
- if err := store.Initialize(viperSub); err != nil {
- glog.Fatalf("Failed to initialize target store for %s: %+v",
- store.GetName(), err)
- } else {
- targetStore = store
- }
- break
- }
- }
-
- if sourceStore == nil {
- glog.Errorf("Failed to find source store %s", *filerExportSourceStore)
- println("existing data sources are:")
- for _, store := range filer2.Stores {
- println(" " + store.GetName())
- }
- return false
- }
-
- if targetStore == nil && *filerExportTargetStore != "" && *filerExportTargetStore != "notification" {
- glog.Errorf("Failed to find target store %s", *filerExportTargetStore)
- println("existing data sources are:")
- for _, store := range filer2.Stores {
- println(" " + store.GetName())
- }
- return false
- }
-
- stat := statistics{}
-
- var fn func(level int, entry *filer2.Entry) error
-
- if *filerExportTargetStore == "notification" {
- weed_server.LoadConfiguration("notification", false)
- v := viper.GetViper()
- notification.LoadConfiguration(v.Sub("notification"))
-
- fn = func(level int, entry *filer2.Entry) error {
- printout(level, entry)
- if *dryRun {
- return nil
- }
- return notification.Queue.SendMessage(
- string(entry.FullPath),
- &filer_pb.EventNotification{
- NewEntry: entry.ToProtoEntry(),
- },
- )
- }
- } else if targetStore == nil {
- fn = printout
- } else {
- fn = func(level int, entry *filer2.Entry) error {
- printout(level, entry)
- if *dryRun {
- return nil
- }
- return targetStore.InsertEntry(entry)
- }
- }
-
- doTraverse(&stat, sourceStore, filer2.FullPath(*dir), 0, fn)
-
- glog.Infof("processed %d directories, %d files", stat.directoryCount, stat.fileCount)
-
- return true
-}
-
-func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer2.FullPath, level int, fn func(level int, entry *filer2.Entry) error) {
-
- limit := *dirListLimit
- lastEntryName := ""
- for {
- entries, err := filerStore.ListDirectoryEntries(parentPath, lastEntryName, false, limit)
- if err != nil {
- break
- }
- for _, entry := range entries {
- if fnErr := fn(level, entry); fnErr != nil {
- glog.Errorf("failed to process entry: %s", entry.FullPath)
- }
- if entry.IsDirectory() {
- stat.directoryCount++
- doTraverse(stat, filerStore, entry.FullPath, level+1, fn)
- } else {
- stat.fileCount++
- }
- }
- if len(entries) < limit {
- break
- }
- }
-}
-
-func printout(level int, entry *filer2.Entry) error {
- for i := 0; i < level; i++ {
- if i == level-1 {
- print("+-")
- } else {
- print("| ")
- }
- }
- print(entry.FullPath.Name())
- if *verboseFilerExport {
- for _, chunk := range entry.Chunks {
- print("[")
- print(chunk.FileId)
- print(",")
- print(chunk.Offset)
- print(",")
- print(chunk.Size)
- print(")")
- }
- }
- println()
- return nil
-}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 3384e4023..40f2b570b 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -12,7 +13,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
- "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/spf13/viper"
)
@@ -28,16 +29,17 @@ var cmdFilerReplicate = &Command{
filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination.
- Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters.
+ Run "weed scaffold -config=replication" to generate a replication.toml file and customize the parameters.
`,
}
func runFilerReplicate(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("replication", true)
- weed_server.LoadConfiguration("notification", true)
- config := viper.GetViper()
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("replication", true)
+ util.LoadConfiguration("notification", true)
+ config := util.GetViper()
var notificationInput sub.NotificationInput
@@ -45,8 +47,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
- viperSub := config.Sub("notification." + input.GetName())
- if err := input.Initialize(viperSub); err != nil {
+ if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize notification input for %s: %+v",
input.GetName(), err)
}
@@ -64,10 +65,9 @@ func runFilerReplicate(cmd *Command, args []string) bool {
// avoid recursive replication
if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
- sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
- if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
- fromDir := sourceConfig.GetString("directory")
- toDir := sinkConfig.GetString("directory")
+ if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") {
+ fromDir := config.GetString("source.filer.directory")
+ toDir := config.GetString("sink.filer.directory")
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
@@ -77,8 +77,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
- viperSub := config.Sub("sink." + sk.GetName())
- if err := sk.Initialize(viperSub); err != nil {
+ if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
@@ -96,7 +95,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
return true
}
- replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink)
+ replicator := replication.NewReplicator(config, "source.filer.", dataSink)
for {
key, m, err := notificationInput.ReceiveMessage()
@@ -115,14 +114,13 @@ func runFilerReplicate(cmd *Command, args []string) bool {
} else {
glog.V(1).Infof("modify: %s", key)
}
- if err = replicator.Replicate(key, m); err != nil {
+ if err = replicator.Replicate(context.Background(), key, m); err != nil {
glog.Errorf("replicate %s: %+v", key, err)
} else {
glog.V(1).Infof("replicated %s", key)
}
}
- return true
}
func validateOneEnabledInput(config *viper.Viper) {
diff --git a/weed/command/fix.go b/weed/command/fix.go
index a800978c6..223808f4b 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -7,6 +7,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -29,12 +32,12 @@ var (
)
type VolumeFileScanner4Fix struct {
- version storage.Version
- nm *storage.NeedleMap
+ version needle.Version
+ nm *needle_map.MemDb
}
-func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -42,14 +45,14 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
return false
}
-func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *storage.Needle, offset int64) error {
- glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
- if n.Size > 0 {
- pe := scanner.nm.Put(n.Id, types.Offset(offset/types.NeedlePaddingSize), n.Size)
+func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
+ if n.Size > 0 && n.Size != types.TombstoneFileSize {
+ pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
glog.V(2).Infof("skipping deleted file ...")
- return scanner.nm.Delete(n.Id, types.Offset(offset/types.NeedlePaddingSize))
+ return scanner.nm.Delete(n.Id)
}
return nil
}
@@ -65,23 +68,22 @@ func runFix(cmd *Command, args []string) bool {
baseFileName = *fixVolumeCollection + "_" + baseFileName
}
indexFileName := path.Join(*fixVolumePath, baseFileName+".idx")
- indexFile, err := os.OpenFile(indexFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- nm := storage.NewBtreeNeedleMap(indexFile)
+ nm := needle_map.NewMemDb()
defer nm.Close()
- vid := storage.VolumeId(*fixVolumeId)
+ vid := needle.VolumeId(*fixVolumeId)
scanner := &VolumeFileScanner4Fix{
nm: nm,
}
- err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner)
- if err != nil {
- glog.Fatalf("Export Volume File [ERROR] %s\n", err)
+ if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
+ glog.Fatalf("scan .dat File: %v", err)
+ os.Remove(indexFileName)
+ }
+
+ if err := nm.SaveToIdx(indexFileName); err != nil {
+ glog.Fatalf("save to .idx File: %v", err)
os.Remove(indexFileName)
}
diff --git a/weed/command/master.go b/weed/command/master.go
index bd2267b9e..21c759f4e 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -6,120 +6,149 @@ import (
"runtime"
"strconv"
"strings"
- "time"
+
+ "github.com/chrislusf/raft/protobuf"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
- "google.golang.org/grpc/reflection"
)
+var (
+ m MasterOptions
+)
+
+type MasterOptions struct {
+ port *int
+ ip *string
+ ipBind *string
+ metaFolder *string
+ peers *string
+ volumeSizeLimitMB *uint
+ volumePreallocate *bool
+ // pulseSeconds *int
+ defaultReplication *string
+ garbageThreshold *float64
+ whiteList *string
+ disableHttp *bool
+ metricsAddress *string
+ metricsIntervalSec *int
+}
+
func init() {
cmdMaster.Run = runMaster // break init cycle
+ m.port = cmdMaster.Flag.Int("port", 9333, "http listen port")
+ m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address")
+ m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
+ m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
+ m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
+ m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
+ // m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
+ m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
+ m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
+ m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
+ m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address")
+ m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
}
var cmdMaster = &Command{
UsageLine: "master -port=9333",
Short: "start a master server",
- Long: `start a master server to provide volume=>location mapping service
- and sequence number of file ids
+ Long: `start a master server to provide volume=>location mapping service and sequence number of file ids
+
+ The configuration file "security.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order.
+
+ The example security.toml configuration file can be generated by "weed scaffold -config=security"
`,
}
var (
- mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
- mGrpcPort = cmdMaster.Flag.Int("port.grpc", 0, "grpc server listen port, default to http port + 10000")
- masterIp = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address")
- masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
- metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
- masterPeers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094")
- volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
- volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
- mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
- // mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds")
- mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
- masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
- masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
- masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file")
- masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file")
-
- masterWhiteList []string
+ masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file")
+ masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file")
)
func runMaster(cmd *Command, args []string) bool {
- if *mMaxCpu < 1 {
- *mMaxCpu = runtime.NumCPU()
- }
- runtime.GOMAXPROCS(*mMaxCpu)
- util.SetupProfiling(*masterCpuProfile, *masterMemProfile)
- if err := util.TestFolderWritable(*metaFolder); err != nil {
- glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err)
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("master", false)
+
+ runtime.GOMAXPROCS(runtime.NumCPU())
+ grace.SetupProfiling(*masterCpuProfile, *masterMemProfile)
+
+ if err := util.TestFolderWritable(*m.metaFolder); err != nil {
+ glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
}
- if *masterWhiteListOption != "" {
- masterWhiteList = strings.Split(*masterWhiteListOption, ",")
+
+ var masterWhiteList []string
+ if *m.whiteList != "" {
+ masterWhiteList = strings.Split(*m.whiteList, ",")
}
- if *volumeSizeLimitMB > 30*1000 {
+ if *m.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
}
- r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
- *volumeSizeLimitMB, *volumePreallocate,
- *mpulse, *defaultReplicaPlacement, *garbageThreshold,
- masterWhiteList, *masterSecureKey,
- )
+ startMaster(m, masterWhiteList)
+
+ return true
+}
- listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport)
+func startMaster(masterOption MasterOptions, masterWhiteList []string) {
- glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)
+ backend.LoadConfiguration(util.GetViper())
+ myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
+
+ r := mux.NewRouter()
+ ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
+ listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port)
+ glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
masterListener, e := util.NewListener(listeningAddress, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
+ // start raftServer
+ raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
+ peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, 5)
+ if raftServer == nil {
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
+ }
+ ms.SetRaftServer(raftServer)
+ r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
+ // starting grpc server
+ grpcPort := *masterOption.port + 10000
+ grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0)
+ if err != nil {
+ glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
+ }
+ // Create your protocol servers.
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
+ master_pb.RegisterSeaweedServer(grpcS, ms)
+ protobuf.RegisterRaftServer(grpcS, raftServer)
+ reflection.Register(grpcS)
+ glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
+ go grpcS.Serve(grpcL)
- go func() {
- time.Sleep(100 * time.Millisecond)
- myMasterAddress, peers := checkPeers(*masterIp, *mport, *masterPeers)
- raftServer := weed_server.NewRaftServer(r, peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse)
- ms.SetRaftServer(raftServer)
- }()
-
- go func() {
- // starting grpc server
- grpcPort := *mGrpcPort
- if grpcPort == 0 {
- grpcPort = *mport + 10000
- }
- grpcL, err := util.NewListener(*masterBindIp+":"+strconv.Itoa(grpcPort), 0)
- if err != nil {
- glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
- }
- // Create your protocol servers.
- grpcS := util.NewGrpcServer()
- master_pb.RegisterSeaweedServer(grpcS, ms)
- reflection.Register(grpcS)
-
- glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterBindIp, grpcPort)
- grpcS.Serve(grpcL)
- }()
+ go ms.MasterClient.KeepConnectedToMaster()
// start http server
httpS := &http.Server{Handler: r}
- if err := httpS.Serve(masterListener); err != nil {
- glog.Fatalf("master server failed to serve: %v", err)
- }
+ go httpS.Serve(masterListener)
- return true
+ select {}
}
func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
+ glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers)
masterAddress = masterIp + ":" + strconv.Itoa(masterPort)
if peers != "" {
cleanedPeers = strings.Split(peers, ",")
@@ -133,12 +162,28 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
}
}
- peerCount := len(cleanedPeers)
if !hasSelf {
- peerCount += 1
+ cleanedPeers = append(cleanedPeers, masterAddress)
}
- if peerCount%2 == 0 {
+ if len(cleanedPeers)%2 == 0 {
glog.Fatalf("Only odd number of masters are supported!")
}
return
}
+
+func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
+ return &weed_server.MasterOption{
+ Host: *m.ip,
+ Port: *m.port,
+ MetaFolder: *m.metaFolder,
+ VolumeSizeLimitMB: *m.volumeSizeLimitMB,
+ VolumePreallocate: *m.volumePreallocate,
+ // PulseSeconds: *m.pulseSeconds,
+ DefaultReplicaPlacement: *m.defaultReplication,
+ GarbageThreshold: *m.garbageThreshold,
+ WhiteList: whiteList,
+ DisableHttp: *m.disableHttp,
+ MetricsAddress: *m.metricsAddress,
+ MetricsIntervalSec: *m.metricsIntervalSec,
+ }
+}
diff --git a/weed/command/mount.go b/weed/command/mount.go
index e61f16783..440aca8c6 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,22 +1,26 @@
package command
import (
- "fmt"
- "strconv"
- "strings"
+ "os"
)
type MountOptions struct {
- filer *string
- filerGrpcPort *int
- filerMountRootPath *string
- dir *string
- dirListingLimit *int
- collection *string
- replication *string
- ttlSec *int
- chunkSizeLimitMB *int
- dataCenter *string
+ filer *string
+ filerMountRootPath *string
+ dir *string
+ dirAutoCreate *bool
+ dirListCacheLimit *int64
+ collection *string
+ replication *string
+ ttlSec *int
+ chunkSizeLimitMB *int
+ cacheDir *string
+ cacheSizeMB *int64
+ dataCenter *string
+ allowOthers *bool
+ umaskString *string
+ nonempty *bool
+ outsideContainerClusterMode *bool
}
var (
@@ -28,17 +32,23 @@ var (
func init() {
cmdMount.Run = runMount // break init cycle
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
- mountOptions.filerGrpcPort = cmdMount.Flag.Int("filer.grpc.port", 0, "filer grpc server listen port, default to http port + 10000")
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
- mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size")
+ mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
+ mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
- mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
+ mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files")
+ mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
+ mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
+ mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
+ mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
+ mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
+ mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
}
var cmdMount = &Command{
@@ -56,24 +66,11 @@ var cmdMount = &Command{
On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
- `,
-}
-
-func parseFilerGrpcAddress(filer string, optionalGrpcPort int) (filerGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(filer, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("The filer should have hostname:port format: %v", hostnameAndPort)
- }
+ If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose,
+ the volume servers are not accessible by their own ip addresses.
+ In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming:
+ * All volume server containers are accessible through the same hostname or IP address as the filer.
+ * All volume server container ports are open external to the cluster.
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("The filer filer port parse error: %v", parseErr)
- }
-
- filerGrpcPort := int(filerPort) + 10000
- if optionalGrpcPort != 0 {
- filerGrpcPort = optionalGrpcPort
- }
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
+ `,
}
diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go
new file mode 100644
index 000000000..f0a5581e7
--- /dev/null
+++ b/weed/command/mount_darwin.go
@@ -0,0 +1,13 @@
+package command
+
+import (
+ "github.com/seaweedfs/fuse"
+)
+
+func osSpecificMountOptions() []fuse.MountOption {
+ return []fuse.MountOption{}
+}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go
new file mode 100644
index 000000000..f0a5581e7
--- /dev/null
+++ b/weed/command/mount_freebsd.go
@@ -0,0 +1,13 @@
+package command
+
+import (
+ "github.com/seaweedfs/fuse"
+)
+
+func osSpecificMountOptions() []fuse.MountOption {
+ return []fuse.MountOption{}
+}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go
new file mode 100644
index 000000000..25c4f72cf
--- /dev/null
+++ b/weed/command/mount_linux.go
@@ -0,0 +1,155 @@
+package command
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+
+ "github.com/seaweedfs/fuse"
+)
+
+const (
+ /* 36 35 98:0 /mnt1 /mnt2 rw,noatime master:1 - ext3 /dev/root rw,errors=continue
+ (1)(2)(3) (4) (5) (6) (7) (8) (9) (10) (11)
+
+ (1) mount ID: unique identifier of the mount (may be reused after umount)
+ (2) parent ID: ID of parent (or of self for the top of the mount tree)
+ (3) major:minor: value of st_dev for files on filesystem
+ (4) root: root of the mount within the filesystem
+ (5) mount point: mount point relative to the process's root
+ (6) mount options: per mount options
+ (7) optional fields: zero or more fields of the form "tag[:value]"
+ (8) separator: marks the end of the optional fields
+ (9) filesystem type: name of filesystem of the form "type[.subtype]"
+ (10) mount source: filesystem specific information or "none"
+ (11) super options: per super block options*/
+ mountinfoFormat = "%d %d %d:%d %s %s %s %s"
+)
+
+// Info reveals information about a particular mounted filesystem. This
+// struct is populated from the content in the /proc/<pid>/mountinfo file.
+type Info struct {
+ // ID is a unique identifier of the mount (may be reused after umount).
+ ID int
+
+ // Parent indicates the ID of the mount parent (or of self for the top of the
+ // mount tree).
+ Parent int
+
+ // Major indicates one half of the device ID which identifies the device class.
+ Major int
+
+ // Minor indicates one half of the device ID which identifies a specific
+ // instance of device.
+ Minor int
+
+ // Root of the mount within the filesystem.
+ Root string
+
+ // Mountpoint indicates the mount point relative to the process's root.
+ Mountpoint string
+
+ // Opts represents mount-specific options.
+ Opts string
+
+ // Optional represents optional fields.
+ Optional string
+
+ // Fstype indicates the type of filesystem, such as EXT3.
+ Fstype string
+
+ // Source indicates filesystem specific information or "none".
+ Source string
+
+ // VfsOpts represents per super block options.
+ VfsOpts string
+}
+
+// Mounted determines if a specified mountpoint has been mounted.
+// On Linux it looks at /proc/self/mountinfo and on Solaris at mnttab.
+func mounted(mountPoint string) (bool, error) {
+ entries, err := parseMountTable()
+ if err != nil {
+ return false, err
+ }
+
+ // Search the table for the mountPoint
+ for _, e := range entries {
+ if e.Mountpoint == mountPoint {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+// Parse /proc/self/mountinfo because comparing Dev and ino does not work from
+// bind mounts
+func parseMountTable() ([]*Info, error) {
+ f, err := os.Open("/proc/self/mountinfo")
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ return parseInfoFile(f)
+}
+
+func parseInfoFile(r io.Reader) ([]*Info, error) {
+ var (
+ s = bufio.NewScanner(r)
+ out []*Info
+ )
+
+ for s.Scan() {
+ if err := s.Err(); err != nil {
+ return nil, err
+ }
+
+ var (
+ p = &Info{}
+ text = s.Text()
+ optionalFields string
+ )
+
+ if _, err := fmt.Sscanf(text, mountinfoFormat,
+ &p.ID, &p.Parent, &p.Major, &p.Minor,
+ &p.Root, &p.Mountpoint, &p.Opts, &optionalFields); err != nil {
+ return nil, fmt.Errorf("Scanning '%s' failed: %s", text, err)
+ }
+ // Safe as mountinfo encodes mountpoints with spaces as \040.
+ index := strings.Index(text, " - ")
+ postSeparatorFields := strings.Fields(text[index+3:])
+ if len(postSeparatorFields) < 3 {
+ return nil, fmt.Errorf("Error found less than 3 fields post '-' in %q", text)
+ }
+
+ if optionalFields != "-" {
+ p.Optional = optionalFields
+ }
+
+ p.Fstype = postSeparatorFields[0]
+ p.Source = postSeparatorFields[1]
+ p.VfsOpts = strings.Join(postSeparatorFields[2:], " ")
+ out = append(out, p)
+ }
+ return out, nil
+}
+
+func osSpecificMountOptions() []fuse.MountOption {
+ return []fuse.MountOption{}
+}
+
+func checkMountPointAvailable(dir string) bool {
+ mountPoint := dir
+ if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") {
+ mountPoint = mountPoint[0 : len(mountPoint)-1]
+ }
+
+ if mounted, err := mounted(mountPoint); err != nil || mounted {
+ return false
+ }
+
+ return true
+}
diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go
index 3bf22ddc4..f3c0de3d6 100644
--- a/weed/command/mount_notsupported.go
+++ b/weed/command/mount_notsupported.go
@@ -1,5 +1,6 @@
// +build !linux
// +build !darwin
+// +build !freebsd
package command
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 2937b9ef1..915754166 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -1,11 +1,13 @@
-// +build linux darwin
+// +build linux darwin freebsd
package command
import (
+ "context"
"fmt"
"os"
"os/user"
+ "path"
"runtime"
"strconv"
"strings"
@@ -13,104 +15,185 @@ import (
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
func runMount(cmd *Command, args []string) bool {
- fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
- if *mountOptions.dir == "" {
+
+ grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
+
+ umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
+ if umaskErr != nil {
+ fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
+ return false
+ }
+
+ if len(args) > 0 {
+ return false
+ }
+
+ return RunMount(&mountOptions, os.FileMode(umask))
+}
+
+func RunMount(option *MountOptions, umask os.FileMode) bool {
+
+ filer := *option.filer
+ // parse filer grpc address
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
+ if err != nil {
+ glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
+ return true
+ }
+
+ // try to connect to filer, filerBucketsPath may be useful later
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ var cipher bool
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ return true
+ }
+
+ filerMountRootPath := *option.filerMountRootPath
+ dir := *option.dir
+ chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
+
+ util.LoadConfiguration("security", false)
+
+ fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
+ if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
- if *mountOptions.chunkSizeLimitMB <= 0 {
+ if chunkSizeLimitMB <= 0 {
fmt.Printf("Please specify a reasonable buffer size.")
return false
}
- fuse.Unmount(*mountOptions.dir)
+ fuse.Unmount(dir)
+
+ uid, gid := uint32(0), uint32(0)
// detect mount folder mode
+ if *option.dirAutoCreate {
+ os.MkdirAll(dir, 0755)
+ }
mountMode := os.ModeDir | 0755
- if fileInfo, err := os.Stat(*mountOptions.dir); err == nil {
+ fileInfo, err := os.Stat(dir)
+ if err == nil {
mountMode = os.ModeDir | fileInfo.Mode()
+ uid, gid = util.GetFileUidGid(fileInfo)
+ fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode())
}
- // detect current user
- uid, gid := uint32(0), uint32(0)
- if u, err := user.Current(); err == nil {
- if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
- uid = uint32(parsedId)
- }
- if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
- gid = uint32(parsedId)
+ if uid == 0 {
+ if u, err := user.Current(); err == nil {
+ if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
+ uid = uint32(parsedId)
+ }
+ if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
+ gid = uint32(parsedId)
+ }
+ fmt.Printf("current uid=%d gid=%d\n", uid, gid)
}
}
- util.SetupProfiling(*mountCpuProfile, *mountMemProfile)
+ // Ensure target mount point availability
+ if isValid := checkMountPointAvailable(dir); !isValid {
+ glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
+ return true
+ }
- c, err := fuse.Mount(
- *mountOptions.dir,
- fuse.VolumeName("SeaweedFS"),
- fuse.FSName("SeaweedFS"),
- fuse.Subtype("SeaweedFS"),
- fuse.NoAppleDouble(),
+ mountName := path.Base(dir)
+
+ options := []fuse.MountOption{
+ fuse.VolumeName(mountName),
+ fuse.FSName(filer + ":" + filerMountRootPath),
+ fuse.Subtype("seaweedfs"),
+ // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders
fuse.NoAppleXattr(),
fuse.NoBrowse(),
fuse.AutoXattr(),
fuse.ExclCreate(),
fuse.DaemonTimeout("3600"),
- fuse.AllowOther(),
fuse.AllowSUID(),
fuse.DefaultPermissions(),
- fuse.MaxReadahead(1024*128),
+ fuse.MaxReadahead(1024 * 128),
fuse.AsyncRead(),
fuse.WritebackCache(),
- )
- if err != nil {
- glog.Fatal(err)
- return false
}
- util.OnInterrupt(func() {
- fuse.Unmount(*mountOptions.dir)
- c.Close()
- })
-
- filerGrpcAddress, err := parseFilerGrpcAddress(*mountOptions.filer, *mountOptions.filerGrpcPort)
- if err != nil {
- glog.Fatal(err)
- return false
+ options = append(options, osSpecificMountOptions()...)
+ if *option.allowOthers {
+ options = append(options, fuse.AllowOther())
+ }
+ if *option.nonempty {
+ options = append(options, fuse.AllowNonEmptyMount())
}
- mountRoot := *mountOptions.filerMountRootPath
+ // find mount point
+ mountRoot := filerMountRootPath
if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
- err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{
- FilerGrpcAddress: filerGrpcAddress,
- FilerMountRootPath: mountRoot,
- Collection: *mountOptions.collection,
- Replication: *mountOptions.replication,
- TtlSec: int32(*mountOptions.ttlSec),
- ChunkSizeLimit: int64(*mountOptions.chunkSizeLimitMB) * 1024 * 1024,
- DataCenter: *mountOptions.dataCenter,
- DirListingLimit: *mountOptions.dirListingLimit,
- EntryCacheTtl: 3 * time.Second,
- MountUid: uid,
- MountGid: gid,
- MountMode: mountMode,
- }))
+ seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
+ FilerGrpcAddress: filerGrpcAddress,
+ GrpcDialOption: grpcDialOption,
+ FilerMountRootPath: mountRoot,
+ Collection: *option.collection,
+ Replication: *option.replication,
+ TtlSec: int32(*option.ttlSec),
+ ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
+ CacheDir: *option.cacheDir,
+ CacheSizeMB: *option.cacheSizeMB,
+ DataCenter: *option.dataCenter,
+ DirListCacheLimit: *option.dirListCacheLimit,
+ EntryCacheTtl: 3 * time.Second,
+ MountUid: uid,
+ MountGid: gid,
+ MountMode: mountMode,
+ MountCtime: fileInfo.ModTime(),
+ MountMtime: time.Now(),
+ Umask: umask,
+ OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
+ Cipher: cipher,
+ })
+
+ // mount
+ c, err := fuse.Mount(dir, options...)
if err != nil {
- fuse.Unmount(*mountOptions.dir)
+ glog.V(0).Infof("mount: %v", err)
+ return true
}
+ defer fuse.Unmount(dir)
+
+ grace.OnInterrupt(func() {
+ fuse.Unmount(dir)
+ c.Close()
+ })
+
+ glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir)
+ err = fs.Serve(c, seaweedFileSystem)
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("mount process: %v", err)
+ return true
}
return true
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
new file mode 100644
index 000000000..b4b5855ff
--- /dev/null
+++ b/weed/command/msg_broker.go
@@ -0,0 +1,114 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ messageBrokerStandaloneOptions MessageBrokerOptions
+)
+
+type MessageBrokerOptions struct {
+ filer *string
+ ip *string
+ port *int
+ cpuprofile *string
+ memprofile *string
+}
+
+func init() {
+ cmdMsgBroker.Run = runMsgBroker // break init cycle
+ messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
+ messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
+ messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
+ messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
+ messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
+}
+
+var cmdMsgBroker = &Command{
+ UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
+ Short: "start a message queue broker",
+ Long: `start a message queue broker
+
+ The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
+ The brokers are stateless. To scale up, just add more brokers.
+
+`,
+}
+
+func runMsgBroker(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ return messageBrokerStandaloneOptions.startQueueServer()
+
+}
+
+func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
+
+ grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
+
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
+ cipher := false
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
+ qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
+ Filers: []string{*msgBrokerOpt.filer},
+ DefaultReplication: "",
+ MaxMB: 0,
+ Ip: *msgBrokerOpt.ip,
+ Port: *msgBrokerOpt.port,
+ Cipher: cipher,
+ }, grpcDialOption)
+
+ // start grpc listener
+ grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
+ messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 16a9490ff..7ebd4fab0 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -1,64 +1,161 @@
package command
import (
+ "context"
+ "fmt"
"net/http"
"time"
- "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+
+ "github.com/gorilla/mux"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
)
var (
- s3options S3Options
+ s3StandaloneOptions S3Options
)
type S3Options struct {
- filer *string
- filerGrpcPort *int
- filerBucketsPath *string
- port *int
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
}
func init() {
cmdS3.Run = runS3 // break init cycle
- s3options.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
- s3options.filerGrpcPort = cmdS3.Flag.Int("filer.grpcPort", 0, "filer server grpc port, default to filer http port plus 10000")
- s3options.filerBucketsPath = cmdS3.Flag.String("filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
- s3options.port = cmdS3.Flag.Int("port", 8333, "s3options server http listen port")
- s3options.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}")
- s3options.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
- s3options.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
+ s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
+ s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
+ s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
+ s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
+ s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
}
var cmdS3 = &Command{
- UsageLine: "s3 -port=8333 -filer=<ip:port>",
+ UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]",
Short: "start a s3 API compatible server that is backed by a filer",
Long: `start a s3 API compatible server that is backed by a filer.
+ By default, you can use any access key and secret key to access the S3 APIs.
+ To enable credential based access, create a config.json file similar to this:
+
+{
+ "identities": [
+ {
+ "name": "some_name",
+ "credentials": [
+ {
+ "accessKey": "some_access_key1",
+ "secretKey": "some_secret_key1"
+ }
+ ],
+ "actions": [
+ "Admin",
+ "Read",
+ "Write"
+ ]
+ },
+ {
+ "name": "some_read_only_user",
+ "credentials": [
+ {
+ "accessKey": "some_access_key2",
+ "secretKey": "some_secret_key2"
+ }
+ ],
+ "actions": [
+ "Read"
+ ]
+ },
+ {
+ "name": "some_normal_user",
+ "credentials": [
+ {
+ "accessKey": "some_access_key3",
+ "secretKey": "some_secret_key3"
+ }
+ ],
+ "actions": [
+ "Read",
+ "Write"
+ ]
+ },
+ {
+ "name": "user_limited_to_bucket1",
+ "credentials": [
+ {
+ "accessKey": "some_access_key4",
+ "secretKey": "some_secret_key4"
+ }
+ ],
+ "actions": [
+ "Read:bucket1",
+ "Write:bucket1"
+ ]
+ }
+ ]
+}
+
`,
}
func runS3(cmd *Command, args []string) bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*s3options.filer, *s3options.filerGrpcPort)
+ util.LoadConfiguration("security", false)
+
+ return s3StandaloneOptions.startS3Server()
+
+}
+
+func (s3opt *S3Options) startS3Server() bool {
+
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false
}
+ filerBucketsPath := "/buckets"
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ filerBucketsPath = resp.DirBuckets
+ glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
- Filer: *s3options.filer,
+ Filer: *s3opt.filer,
FilerGrpcAddress: filerGrpcAddress,
- DomainName: *s3options.domainName,
- BucketsPath: *s3options.filerBucketsPath,
+ Config: *s3opt.config,
+ DomainName: *s3opt.domainName,
+ BucketsPath: filerBucketsPath,
+ GrpcDialOption: grpcDialOption,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
@@ -66,22 +163,22 @@ func runS3(cmd *Command, args []string) bool {
httpS := &http.Server{Handler: router}
- listenAddress := fmt.Sprintf(":%d", *s3options.port)
+ listenAddress := fmt.Sprintf(":%d", *s3opt.port)
s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second)
if err != nil {
glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
}
- if *s3options.tlsPrivateKey != "" {
- if err = httpS.ServeTLS(s3ApiListener, *s3options.tlsCertificate, *s3options.tlsPrivateKey); err != nil {
+ if *s3opt.tlsPrivateKey != "" {
+ glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
+ if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
- glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.VERSION, *s3options.port)
} else {
+ glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port)
if err = httpS.Serve(s3ApiListener); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
- glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.VERSION, *s3options.port)
}
return true
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index ec0723859..b199f2d2d 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -10,16 +10,24 @@ func init() {
}
var cmdScaffold = &Command{
- UsageLine: "scaffold [filer]",
+ UsageLine: "scaffold -config=[filer|notification|replication|security|master]",
Short: "generate basic configuration files",
Long: `Generate filer.toml with all possible configurations for you to customize.
+ The options can also be overwritten by environment variables.
+ For example, the filer.toml mysql password can be overwritten by environment variable
+ export WEED_MYSQL_PASSWORD=some_password
+ Environment variable rules:
+ * Prefix the variable name with "WEED_"
+ * Upppercase the reset of variable name.
+ * Replace '.' with '_'
+
`,
}
var (
outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory")
- config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication] the configuration file to generate")
+ config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master] the configuration file to generate")
)
func runScaffold(cmd *Command, args []string) bool {
@@ -32,6 +40,10 @@ func runScaffold(cmd *Command, args []string) bool {
content = NOTIFICATION_TOML_EXAMPLE
case "replication":
content = REPLICATION_TOML_EXAMPLE
+ case "security":
+ content = SECURITY_TOML_EXAMPLE
+ case "master":
+ content = MASTER_TOML_EXAMPLE
}
if content == "" {
println("need a valid -config option")
@@ -55,27 +67,39 @@ const (
# $HOME/.seaweedfs/filer.toml
# /etc/seaweedfs/filer.toml
-[memory]
-# local in memory, mostly for testing purpose
-enabled = false
+####################################################
+# Customizable filer server options
+####################################################
+[filer.options]
+# with http DELETE, by default the filer would check whether a folder is empty.
+# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
+recursive_delete = false
+# directories under this folder will be automatically creating a separate bucket
+buckets_folder = "/buckets"
+buckets_fsync = [ # a list of buckets with all write requests fsync=true
+ "important_bucket",
+ "should_always_fsync",
+]
+
+####################################################
+# The following are filer store options
+####################################################
-[leveldb]
+[leveldb2]
# local on disk, mostly for simple single-machine setup, fairly scalable
+# faster than previous leveldb, recommended.
enabled = true
dir = "." # directory to store level db files
-####################################################
-# multiple filers on shared storage, fairly scalable
-####################################################
-
-[mysql]
+[mysql] # or tidb
# CREATE TABLE IF NOT EXISTS filemeta (
-# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
-# name VARCHAR(1000) COMMENT 'directory or file name',
-# directory VARCHAR(4096) COMMENT 'full path to parent directory',
-# meta BLOB,
+# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
+# name VARCHAR(1000) COMMENT 'directory or file name',
+# directory TEXT COMMENT 'full path to parent directory',
+# meta LONGBLOB,
# PRIMARY KEY (dirhash, name)
# ) DEFAULT CHARSET=utf8;
+
enabled = false
hostname = "localhost"
port = 3306
@@ -84,12 +108,13 @@ password = ""
database = "" # create or use an existing database
connection_max_idle = 2
connection_max_open = 100
+interpolateParams = false
-[postgres]
+[postgres] # or cockroachdb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT,
-# name VARCHAR(1000),
-# directory VARCHAR(4096),
+# name VARCHAR(65535),
+# directory VARCHAR(65535),
# meta bytea,
# PRIMARY KEY (dirhash, name)
# );
@@ -116,13 +141,13 @@ hosts=[
"localhost:9042",
]
-[redis]
+[redis2]
enabled = false
address = "localhost:6379"
password = ""
-db = 0
+database = 0
-[redis_cluster]
+[redis_cluster2]
enabled = false
addresses = [
"localhost:30001",
@@ -132,7 +157,22 @@ addresses = [
"localhost:30005",
"localhost:30006",
]
+password = ""
+# allows reads from slave servers or the master, but all writes still go to the master
+readOnly = true
+# automatically use the closest Redis server for reads
+routeByLatency = true
+
+[etcd]
+enabled = false
+servers = "localhost:2379"
+timeout = "3s"
+[mongodb]
+enabled = false
+uri = "mongodb://localhost:27017"
+option_pool_size = 0
+database = "seaweedfs"
`
NOTIFICATION_TOML_EXAMPLE = `
@@ -178,6 +218,17 @@ google_application_credentials = "/path/to/x.json" # path to json credential fil
project_id = "" # an existing project id
topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists
+[notification.gocdk_pub_sub]
+# The Go Cloud Development Kit (https://gocloud.dev).
+# PubSub API (https://godoc.org/gocloud.dev/pubsub).
+# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
+enabled = false
+# This URL will Dial the RabbitMQ server at the URL in the environment
+# variable RABBIT_SERVER_URL and open the exchange "myexchange".
+# The exchange must have already been created by some other means, like
+# the RabbitMQ management plugin.
+topic_url = "rabbit://myexchange"
+sub_url = "rabbit://myqueue"
`
REPLICATION_TOML_EXAMPLE = `
@@ -194,28 +245,29 @@ grpcAddress = "localhost:18888"
# all files under this directory tree are replicated.
# this is not a directory on your hard drive, but on your filer.
# i.e., all files with this "prefix" are sent to notification message queue.
-directory = "/buckets"
+directory = "/buckets"
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
# all replicated files are under this directory tree
-# this is not a directory on your hard drive, but on your filer.
+# this is not a directory on your hard drive, but on your filer.
# i.e., all received files will be "prefixed" to this directory.
-directory = "/backup"
+directory = "/backup"
replication = ""
collection = ""
ttlSec = 0
[sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
-# default loads credentials from the shared credentials file (~/.aws/credentials).
+# default loads credentials from the shared credentials file (~/.aws/credentials).
enabled = false
aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory
+endpoint = ""
[sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@@ -240,4 +292,127 @@ bucket = "mybucket" # an existing bucket
directory = "/" # destination directory
`
+
+ SECURITY_TOML_EXAMPLE = `
+# Put this file to one of the location, with descending priority
+# ./security.toml
+# $HOME/.seaweedfs/security.toml
+# /etc/seaweedfs/security.toml
+# this file is read by master, volume server, and filer
+
+# the jwt signing key is read by master and volume server.
+# a jwt defaults to expire after 10 seconds.
+[jwt.signing]
+key = ""
+expires_after_seconds = 10 # seconds
+
+# jwt for read is only supported with master+volume setup. Filer does not support this mode.
+[jwt.signing.read]
+key = ""
+expires_after_seconds = 10 # seconds
+
+# all grpc tls authentications are mutual
+# the values for the following ca, cert, and key are paths to the PERM files.
+# the host name is not checked, so the PERM files can be shared.
+[grpc]
+ca = ""
+
+[grpc.volume]
+cert = ""
+key = ""
+
+[grpc.master]
+cert = ""
+key = ""
+
+[grpc.filer]
+cert = ""
+key = ""
+
+[grpc.msg_broker]
+cert = ""
+key = ""
+
+# use this for any place needs a grpc client
+# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
+[grpc.client]
+cert = ""
+key = ""
+
+
+# volume server https options
+# Note: work in progress!
+# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
+[https.client]
+enabled = true
+[https.volume]
+cert = ""
+key = ""
+
+
+`
+
+ MASTER_TOML_EXAMPLE = `
+# Put this file to one of the location, with descending priority
+# ./master.toml
+# $HOME/.seaweedfs/master.toml
+# /etc/seaweedfs/master.toml
+# this file is read by master
+
+[master.maintenance]
+# periodically run these scripts are the same as running them from 'weed shell'
+scripts = """
+ lock
+ ec.encode -fullPercent=95 -quietFor=1h
+ ec.rebuild -force
+ ec.balance -force
+ volume.balance -force
+ volume.fix.replication
+ unlock
+"""
+sleep_minutes = 17 # sleep minutes between each script execution
+
+[master.filer]
+default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands
+
+
+[master.sequencer]
+type = "memory" # Choose [memory|etcd] type for storing the file id sequence
+# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
+# example : http://127.0.0.1:2379,http://127.0.0.1:2389
+sequencer_etcd_urls = "http://127.0.0.1:2379"
+
+
+# configurations for tiered cloud storage
+# old volumes are transparently moved to cloud for cost efficiency
+[storage.backend]
+ [storage.backend.s3.default]
+ enabled = false
+ aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ region = "us-east-2"
+ bucket = "your_bucket_name" # an existing bucket
+ endpoint = ""
+
+# create this number of logical volumes if no more writable volumes
+# count_x means how many copies of data.
+# e.g.:
+# 000 has only one copy, copy_1
+# 010 and 001 has two copies, copy_2
+# 011 has only 3 copies, copy_3
+[master.volume_growth]
+copy_1 = 7 # create 1 x 7 = 7 actual volumes
+copy_2 = 6 # create 2 x 6 = 12 actual volumes
+copy_3 = 3 # create 3 x 3 = 9 actual volumes
+copy_other = 1 # create n x 1 = n actual volumes
+
+# configuration flags for replication
+[master.replication]
+# any replication counts should be considered minimums. If you specify 010 and
+# have 3 different racks, that's still considered writable. Writes will still
+# try to replicate to all available volumes. You should only use this option
+# if you are doing your own replication or periodic sync of volumes.
+treat_replication_as_minimums = false
+
+`
)
diff --git a/weed/command/scaffold_test.go b/weed/command/scaffold_test.go
new file mode 100644
index 000000000..423dacc32
--- /dev/null
+++ b/weed/command/scaffold_test.go
@@ -0,0 +1,44 @@
+package command
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/spf13/viper"
+)
+
+func TestReadingTomlConfiguration(t *testing.T) {
+
+ viper.SetConfigType("toml")
+
+ // any approach to require this configuration into your program.
+ var tomlExample = []byte(`
+[database]
+server = "192.168.1.1"
+ports = [ 8001, 8001, 8002 ]
+connection_max = 5000
+enabled = true
+
+[servers]
+
+ # You can indent as you please. Tabs or spaces. TOML don't care.
+ [servers.alpha]
+ ip = "10.0.0.1"
+ dc = "eqdc10"
+
+ [servers.beta]
+ ip = "10.0.0.2"
+ dc = "eqdc10"
+
+`)
+
+ viper.ReadConfig(bytes.NewBuffer(tomlExample))
+
+ fmt.Printf("database is %v\n", viper.Get("database"))
+ fmt.Printf("servers is %v\n", viper.GetStringMap("servers"))
+
+ alpha := viper.Sub("servers.alpha")
+
+ fmt.Printf("alpha ip is %v\n", alpha.GetString("ip"))
+}
diff --git a/weed/command/server.go b/weed/command/server.go
index ba5305a97..443f041c5 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -1,21 +1,15 @@
package command
import (
- "net/http"
+ "fmt"
"os"
"runtime"
"runtime/pprof"
- "strconv"
"strings"
- "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
- "google.golang.org/grpc/reflection"
)
type ServerOptions struct {
@@ -24,8 +18,11 @@ type ServerOptions struct {
}
var (
- serverOptions ServerOptions
- filerOptions FilerOptions
+ serverOptions ServerOptions
+ masterOptions MasterOptions
+ filerOptions FilerOptions
+ s3Options S3Options
+ msgBrokerOptions MessageBrokerOptions
)
func init() {
@@ -33,70 +30,90 @@ func init() {
}
var cmdServer = &Command{
- UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name",
- Short: "start a server, including volume server, and automatically elect a master server",
+ UsageLine: "server -dir=/tmp -volume.max=5 -ip=server_name",
+ Short: "start a master server, a volume server, and optionally a filer and a S3 gateway",
Long: `start both a volume server to provide storage spaces
and a master server to provide volume=>location mapping service and sequence number of file ids
This is provided as a convenient way to start both volume server and master server.
- The servers are exactly the same as starting them separately.
+ The servers acts exactly the same as starting them separately.
+ So other volume servers can connect to this master server also.
- So other volume servers can use this embedded master server also.
-
- Optionally, one filer server can be started. Logically, filer servers should not be in a cluster.
- They run with meta data on disk, not shared. So each filer server is different.
+ Optionally, a filer server can be started.
+ Also optionally, a S3 gateway can be started.
`,
}
var (
- serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
- serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
- serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
- serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
- serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
- serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
- serverPeers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list")
- serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
- serverGarbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
- masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
- masterGrpcPort = cmdServer.Flag.Int("master.port.grpc", 0, "master grpc server listen port, default to http port + 10000")
- masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
- masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
- masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
- masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
- volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
- pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
+ serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name")
+ serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
+ serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
+ serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
+ serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
+ serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
+ volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
+ volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.")
+ volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly.")
+
+ // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
+ isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
+ isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string
+
+ False = false
)
func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
+
+ masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
+ masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
+ masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list")
+ masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
+ masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
+ masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "000", "Default replication type if not specified.")
+ masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
+ masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
+ masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
+
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
- filerOptions.grpcPort = cmdServer.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to http port + 10000")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
- filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
+ filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
+ filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
- serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
- serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
+ serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
+ serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
+ serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
+ serverOptions.v.pprof = &False
+
+ s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
+ s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
+ s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
+ s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+
+ msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
}
func runServer(cmd *Command, args []string) bool {
- filerOptions.secretKey = serverSecureKey
+
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("master", false)
+
if *serverOptions.cpuprofile != "" {
f, err := os.Create(*serverOptions.cpuprofile)
if err != nil {
@@ -106,45 +123,62 @@ func runServer(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- if *filerOptions.redirectOnRead {
+ if *isStartingS3 {
+ *isStartingFiler = true
+ }
+ if *isStartingMsgBroker {
*isStartingFiler = true
}
- master := *serverIp + ":" + strconv.Itoa(*masterPort)
+ _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
+ peers := strings.Join(peerList, ",")
+ masterOptions.peers = &peers
+
+ masterOptions.ip = serverIp
+ masterOptions.ipBind = serverBindIp
+ filerOptions.masters = &peers
filerOptions.ip = serverIp
+ filerOptions.bindIp = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
- serverOptions.v.masters = &master
+ serverOptions.v.masters = &peers
serverOptions.v.idleConnectionTimeout = serverTimeout
- serverOptions.v.maxCpu = serverMaxCpu
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
- serverOptions.v.pulseSeconds = pulseSeconds
+ msgBrokerOptions.ip = serverIp
+
+ // serverOptions.v.pulseSeconds = pulseSeconds
+ // masterOptions.pulseSeconds = pulseSeconds
+
+ masterOptions.whiteList = serverWhiteListOption
filerOptions.dataCenter = serverDataCenter
+ filerOptions.disableHttp = serverDisableHttp
+ masterOptions.disableHttp = serverDisableHttp
+
+ filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port)
+ s3Options.filer = &filerAddress
+ msgBrokerOptions.filer = &filerAddress
if *filerOptions.defaultReplicaPlacement == "" {
- *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement
+ *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
}
- if *serverMaxCpu < 1 {
- *serverMaxCpu = runtime.NumCPU()
- }
- runtime.GOMAXPROCS(*serverMaxCpu)
+ runtime.GOMAXPROCS(runtime.NumCPU())
folders := strings.Split(*volumeDataFolders, ",")
- if *masterVolumeSizeLimitMB > 30*1000 {
+ if *masterOptions.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 {
glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000")
}
- if *masterMetaFolder == "" {
- *masterMetaFolder = folders[0]
+ if *masterOptions.metaFolder == "" {
+ *masterOptions.metaFolder = folders[0]
}
- if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
- glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err)
+ if err := util.TestFolderWritable(*masterOptions.metaFolder); err != nil {
+ glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterOptions.metaFolder, err)
}
- filerOptions.defaultLevelDbDirectory = masterMetaFolder
+ filerOptions.defaultLevelDbDirectory = masterOptions.metaFolder
if *serverWhiteListOption != "" {
serverWhiteList = strings.Split(*serverWhiteListOption, ",")
@@ -159,68 +193,29 @@ func runServer(cmd *Command, args []string) bool {
}()
}
- var raftWaitForMaster sync.WaitGroup
- var volumeWait sync.WaitGroup
-
- raftWaitForMaster.Add(1)
- volumeWait.Add(1)
-
- go func() {
- r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
- *masterVolumeSizeLimitMB, *masterVolumePreallocate,
- *pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
- serverWhiteList, *serverSecureKey,
- )
+ if *isStartingS3 {
+ go func() {
+ time.Sleep(2 * time.Second)
- glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterPort)
- masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), 0)
- if e != nil {
- glog.Fatalf("Master startup error: %v", e)
- }
+ s3Options.startS3Server()
- go func() {
- // starting grpc server
- grpcPort := *masterGrpcPort
- if grpcPort == 0 {
- grpcPort = *masterPort + 10000
- }
- grpcL, err := util.NewListener(*serverIp+":"+strconv.Itoa(grpcPort), 0)
- if err != nil {
- glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
- }
- // Create your protocol servers.
- grpcS := util.NewGrpcServer()
- master_pb.RegisterSeaweedServer(grpcS, ms)
- reflection.Register(grpcS)
-
- glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *serverIp, grpcPort)
- grpcS.Serve(grpcL)
}()
+ }
+ if *isStartingMsgBroker {
go func() {
- raftWaitForMaster.Wait()
- time.Sleep(100 * time.Millisecond)
- myAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers)
- raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *pulseSeconds)
- ms.SetRaftServer(raftServer)
- volumeWait.Done()
+ time.Sleep(2 * time.Second)
+ msgBrokerOptions.startQueueServer()
}()
+ }
- raftWaitForMaster.Done()
-
- // start http server
- httpS := &http.Server{Handler: r}
- if err := httpS.Serve(masterListener); err != nil {
- glog.Fatalf("master server failed to serve: %v", err)
- }
-
- }()
+ // start volume server
+ {
+ go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeMinFreeSpacePercent)
- volumeWait.Wait()
- time.Sleep(100 * time.Millisecond)
+ }
- serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
+ startMaster(masterOptions, serverWhiteList)
return true
}
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 19c5049c5..6dd768f47 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -1,61 +1,47 @@
package command
import (
- "bufio"
"fmt"
- "os"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/shell"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ shellOptions shell.ShellOptions
+ shellInitialFiler *string
)
func init() {
cmdShell.Run = runShell // break init cycle
+ shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port")
}
var cmdShell = &Command{
UsageLine: "shell",
- Short: "run interactive commands, now just echo",
- Long: `run interactive commands.
+ Short: "run interactive administrative commands",
+ Long: `run interactive administrative commands.
`,
}
-var ()
-
func runShell(command *Command, args []string) bool {
- r := bufio.NewReader(os.Stdin)
- o := bufio.NewWriter(os.Stdout)
- e := bufio.NewWriter(os.Stderr)
- prompt := func() {
- var err error
- if _, err = o.WriteString("> "); err != nil {
- glog.V(0).Infoln("error writing to stdout:", err)
- }
- if err = o.Flush(); err != nil {
- glog.V(0).Infoln("error flushing stdout:", err)
- }
- }
- readLine := func() string {
- ret, err := r.ReadString('\n')
- if err != nil {
- fmt.Fprint(e, err)
- os.Exit(1)
- }
- return ret
- }
- execCmd := func(cmd string) int {
- if cmd != "" {
- if _, err := o.WriteString(cmd); err != nil {
- glog.V(0).Infoln("error writing to stdout:", err)
- }
- }
- return 0
- }
- cmd := ""
- for {
- prompt()
- cmd = readLine()
- execCmd(cmd)
+ util.LoadConfiguration("security", false)
+ shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ var err error
+ shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
+ if err != nil {
+ fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err)
+ return false
}
+ shellOptions.Directory = "/"
+
+ shell.RunShell(shellOptions)
+
+ return true
+
}
diff --git a/weed/command/upload.go b/weed/command/upload.go
index f664c0e3a..358897aee 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
var (
@@ -15,15 +16,15 @@ var (
)
type UploadOptions struct {
- master *string
- dir *string
- include *string
- replication *string
- collection *string
- dataCenter *string
- ttl *string
- maxMB *int
- secretKey *string
+ master *string
+ dir *string
+ include *string
+ replication *string
+ collection *string
+ dataCenter *string
+ ttl *string
+ maxMB *int
+ usePublicUrl *bool
}
func init() {
@@ -36,8 +37,8 @@ func init() {
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
- upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
+ upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
+ upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")
}
var cmdUpload = &Command{
@@ -53,14 +54,17 @@ var cmdUpload = &Command{
All files under the folder and subfolders will be uploaded, each with its own file key.
Optional parameter "-include" allows you to specify the file name patterns.
- If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly.
+ If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separately.
The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
`,
}
func runUpload(cmd *Command, args []string) bool {
- secret := security.Secret(*upload.secretKey)
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
if len(args) == 0 {
if *upload.dir == "" {
return false
@@ -77,9 +81,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(*upload.master, parts,
- *upload.replication, *upload.collection, *upload.dataCenter,
- *upload.ttl, *upload.maxMB, secret)
+ results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -96,9 +98,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
- results, _ := operation.SubmitFiles(*upload.master, parts,
- *upload.replication, *upload.collection, *upload.dataCenter,
- *upload.ttl, *upload.maxMB, secret)
+ results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}
diff --git a/weed/command/version.go b/weed/command/version.go
index 8fdd68ec8..9caf7dc4e 100644
--- a/weed/command/version.go
+++ b/weed/command/version.go
@@ -19,6 +19,6 @@ func runVersion(cmd *Command, args []string) bool {
cmd.Usage()
}
- fmt.Printf("version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ fmt.Printf("version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
return true
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 27a075b5b..0a7d52049 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,7 +1,9 @@
package command
import (
+ "fmt"
"net/http"
+ httppprof "net/http/pprof"
"os"
"runtime"
"runtime/pprof"
@@ -9,12 +11,22 @@ import (
"strings"
"time"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util/httpdown"
+
+ "google.golang.org/grpc/reflection"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc/reflection"
)
var (
@@ -30,37 +42,40 @@ type VolumeServerOptions struct {
publicUrl *string
bindIp *string
masters *string
- pulseSeconds *int
idleConnectionTimeout *int
- maxCpu *int
dataCenter *string
rack *string
whiteList []string
indexType *string
- fixJpgOrientation *bool
readRedirect *bool
cpuProfile *string
memProfile *string
+ compactionMBPerSecond *int
+ fileSizeLimitMB *int
+ minFreeSpacePercents []float32
+ pprof *bool
+ // pulseSeconds *int
}
func init() {
cmdVolume.Run = runVolume // break init cycle
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
- v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
+ v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
- v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
+ // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
- v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
- v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
- v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
+ v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
+ v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
+ v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
}
var cmdVolume = &Command{
@@ -73,26 +88,39 @@ var cmdVolume = &Command{
var (
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
+ maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.")
volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
+ minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly.")
)
func runVolume(cmd *Command, args []string) bool {
- if *v.maxCpu < 1 {
- *v.maxCpu = runtime.NumCPU()
+
+ util.LoadConfiguration("security", false)
+
+ runtime.GOMAXPROCS(runtime.NumCPU())
+
+ // If --pprof is set we assume the caller wants to be able to collect
+ // cpu and memory profiles via go tool pprof
+ if !*v.pprof {
+ grace.SetupProfiling(*v.cpuProfile, *v.memProfile)
}
- runtime.GOMAXPROCS(*v.maxCpu)
- util.SetupProfiling(*v.cpuProfile, *v.memProfile)
- v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption)
+ v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *minFreeSpacePercent)
return true
}
-func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
+func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption, minFreeSpacePercent string) {
- //Set multiple folders and each folder's max volume count limit'
+ // Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",")
+ for _, folder := range v.folders {
+ if err := util.TestFolderWritable(folder); err != nil {
+ glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
+ }
+ }
+
+ // set max
maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings {
if max, e := strconv.Atoi(maxString); e == nil {
@@ -104,19 +132,33 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if len(v.folders) != len(v.folderMaxLimits) {
glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits))
}
- for _, folder := range v.folders {
- if err := util.TestFolderWritable(folder); err != nil {
- glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
+
+ // set minFreeSpacePercent
+ minFreeSpacePercentStrings := strings.Split(minFreeSpacePercent, ",")
+ for _, freeString := range minFreeSpacePercentStrings {
+ if value, e := strconv.ParseFloat(freeString, 32); e == nil {
+ v.minFreeSpacePercents = append(v.minFreeSpacePercents, float32(value))
+ } else {
+ glog.Fatalf("The value specified in -minFreeSpacePercent not a valid value %s", freeString)
+ }
+ }
+ if len(v.minFreeSpacePercents) == 1 && len(v.folders) > 1 {
+ for i := 0; i < len(v.folders)-1; i++ {
+ v.minFreeSpacePercents = append(v.minFreeSpacePercents, v.minFreeSpacePercents[0])
}
}
+ if len(v.folders) != len(v.minFreeSpacePercents) {
+ glog.Fatalf("%d directories by -dir, but only %d minFreeSpacePercent is set by -minFreeSpacePercent", len(v.folders), len(v.minFreeSpacePercents))
+ }
- //security related white list configuration
+ // security related white list configuration
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
}
if *v.ip == "" {
- *v.ip = "127.0.0.1"
+ *v.ip = util.DetectedHostAddress()
+ glog.V(0).Infof("detected volume server ip address: %v", *v.ip)
}
if *v.publicPort == 0 {
@@ -125,73 +167,169 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if *v.publicUrl == "" {
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
}
- isSeperatedPublicPort := *v.publicPort != *v.port
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
- if isSeperatedPublicPort {
+ if v.isSeparatedPublicPort() {
publicVolumeMux = http.NewServeMux()
}
+ if *v.pprof {
+ volumeMux.HandleFunc("/debug/pprof/", httppprof.Index)
+ volumeMux.HandleFunc("/debug/pprof/cmdline", httppprof.Cmdline)
+ volumeMux.HandleFunc("/debug/pprof/profile", httppprof.Profile)
+ volumeMux.HandleFunc("/debug/pprof/symbol", httppprof.Symbol)
+ volumeMux.HandleFunc("/debug/pprof/trace", httppprof.Trace)
+ }
+
volumeNeedleMapKind := storage.NeedleMapInMemory
switch *v.indexType {
case "leveldb":
volumeNeedleMapKind = storage.NeedleMapLevelDb
- case "boltdb":
- volumeNeedleMapKind = storage.NeedleMapBoltDb
- case "btree":
- volumeNeedleMapKind = storage.NeedleMapBtree
+ case "leveldbMedium":
+ volumeNeedleMapKind = storage.NeedleMapLevelDbMedium
+ case "leveldbLarge":
+ volumeNeedleMapKind = storage.NeedleMapLevelDbLarge
}
masters := *v.masters
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
- v.folders, v.folderMaxLimits,
+ v.folders, v.folderMaxLimits, v.minFreeSpacePercents,
volumeNeedleMapKind,
- strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack,
+ strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList,
- *v.fixJpgOrientation, *v.readRedirect,
+ *v.readRedirect,
+ *v.compactionMBPerSecond,
+ *v.fileSizeLimitMB,
)
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
- glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
- listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
- }
- if isSeperatedPublicPort {
- publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
+ // starting grpc server
+ grpcS := v.startGrpcService(volumeServer)
+
+ // starting public http server
+ var publicHttpDown httpdown.Server
+ if v.isSeparatedPublicPort() {
+ publicHttpDown = v.startPublicHttpService(publicVolumeMux)
+ if nil == publicHttpDown {
+ glog.Fatalf("start public http service failed")
}
- go func() {
- if e := http.Serve(publicListener, publicVolumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve public: %v", e)
- }
- }()
}
- util.OnInterrupt(func() {
+ // starting the cluster http server
+ clusterHttpServer := v.startClusterHttpService(volumeMux)
+
+ stopChain := make(chan struct{})
+ grace.OnInterrupt(func() {
+ fmt.Println("volume server has be killed")
+ var startTime time.Time
+
+ // firstly, stop the public http service to prevent from receiving new user request
+ if nil != publicHttpDown {
+ startTime = time.Now()
+ if err := publicHttpDown.Stop(); err != nil {
+ glog.Warningf("stop the public http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop public http server, elapsed %dms", delta)
+ }
+
+ startTime = time.Now()
+ if err := clusterHttpServer.Stop(); err != nil {
+ glog.Warningf("stop the cluster http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
+
+ startTime = time.Now()
+ grpcS.GracefulStop()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
+
+ startTime = time.Now()
volumeServer.Shutdown()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
+
pprof.StopCPUProfile()
+
+ close(stopChain) // notify exit
})
- // starting grpc server
+ select {
+ case <-stopChain:
+ }
+ glog.Warningf("the volume server exit.")
+}
+
+// check whether configure the public port
+func (v VolumeServerOptions) isSeparatedPublicPort() bool {
+ return *v.publicPort != *v.port
+}
+
+func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
grpcPort := *v.port + 10000
grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer()
- volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
+ volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
- go grpcS.Serve(grpcL)
+ go func() {
+ if err := grpcS.Serve(grpcL); err != nil {
+ glog.Fatalf("start gRPC service failed, %s", err)
+ }
+ }()
+ return grpcS
+}
- if e := http.Serve(listener, volumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
+ publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "public at", publicListeningAddress)
+ publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
+ }
+
+ pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute}
+ publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener)
+ go func() {
+ if err := publicHttpDown.Wait(); err != nil {
+ glog.Errorf("public http down wait failed, %v", err)
+ }
+ }()
+
+ return publicHttpDown
+}
+
+func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server {
+ var (
+ certFile, keyFile string
+ )
+ if viper.GetString("https.volume.key") != "" {
+ certFile = viper.GetString("https.volume.cert")
+ keyFile = viper.GetString("https.volume.key")
+ }
+
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
+ glog.V(0).Infof("Start Seaweed volume server %s at %s", util.Version(), listeningAddress)
+ listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
}
+ httpDown := httpdown.HTTP{
+ KillTimeout: 5 * time.Minute,
+ StopTimeout: 5 * time.Minute,
+ CertFile: certFile,
+ KeyFile: keyFile}
+ clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
+ go func() {
+ if e := clusterHttpServer.Wait(); e != nil {
+ glog.Fatalf("Volume server fail to serve: %v", e)
+ }
+ }()
+ return clusterHttpServer
}
diff --git a/weed/command/watch.go b/weed/command/watch.go
new file mode 100644
index 000000000..b46707a62
--- /dev/null
+++ b/weed/command/watch.go
@@ -0,0 +1,65 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ cmdWatch.Run = runWatch // break init cycle
+}
+
+var cmdWatch = &Command{
+ UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]",
+ Short: "see recent changes on a filer",
+ Long: `See recent changes on a filer.
+
+ `,
+}
+
+var (
+ watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+)
+
+func runWatch(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ClientName: "watch",
+ PathPrefix: *watchTarget,
+ SinceNs: time.Now().Add(-*watchStart).UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ fmt.Printf("events: %+v\n", resp.EventNotification)
+ }
+
+ })
+ if watchErr != nil {
+ fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
+ }
+
+ return true
+}
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
new file mode 100644
index 000000000..b9676c909
--- /dev/null
+++ b/weed/command/webdav.go
@@ -0,0 +1,142 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "os"
+ "os/user"
+ "strconv"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ webDavStandaloneOptions WebDavOption
+)
+
+type WebDavOption struct {
+ filer *string
+ port *int
+ collection *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ cacheDir *string
+ cacheSizeMB *int64
+}
+
+func init() {
+ cmdWebDav.Run = runWebDav // break init cycle
+ webDavStandaloneOptions.filer = cmdWebDav.Flag.String("filer", "localhost:8888", "filer server address")
+ webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port")
+ webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
+ webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
+ webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
+ webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
+ webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
+}
+
+var cmdWebDav = &Command{
+ UsageLine: "webdav -port=7333 -filer=<ip:port>",
+ Short: "start a webdav server that is backed by a filer",
+ Long: `start a webdav server that is backed by a filer.
+
+`,
+}
+
+func runWebDav(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.Version(), *webDavStandaloneOptions.port)
+
+ return webDavStandaloneOptions.startWebDav()
+
+}
+
+func (wo *WebDavOption) startWebDav() bool {
+
+ // detect current user
+ uid, gid := uint32(0), uint32(0)
+ if u, err := user.Current(); err == nil {
+ if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
+ uid = uint32(parsedId)
+ }
+ if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
+ gid = uint32(parsedId)
+ }
+ }
+
+ // parse filer grpc address
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ var cipher bool
+ // connect to filer
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ break
+ }
+ }
+
+ ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{
+ Filer: *wo.filer,
+ FilerGrpcAddress: filerGrpcAddress,
+ GrpcDialOption: grpcDialOption,
+ Collection: *wo.collection,
+ Uid: uid,
+ Gid: gid,
+ Cipher: cipher,
+ CacheDir: *wo.cacheDir,
+ CacheSizeMB: *wo.cacheSizeMB,
+ })
+ if webdavServer_err != nil {
+ glog.Fatalf("WebDav Server startup error: %v", webdavServer_err)
+ }
+
+ httpS := &http.Server{Handler: ws.Handler}
+
+ listenAddress := fmt.Sprintf(":%d", *wo.port)
+ webDavListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second)
+ if err != nil {
+ glog.Fatalf("WebDav Server listener on %s error: %v", listenAddress, err)
+ }
+
+ if *wo.tlsPrivateKey != "" {
+ glog.V(0).Infof("Start Seaweed WebDav Server %s at https port %d", util.Version(), *wo.port)
+ if err = httpS.ServeTLS(webDavListener, *wo.tlsCertificate, *wo.tlsPrivateKey); err != nil {
+ glog.Fatalf("WebDav Server Fail to serve: %v", err)
+ }
+ } else {
+ glog.V(0).Infof("Start Seaweed WebDav Server %s at http port %d", util.Version(), *wo.port)
+ if err = httpS.Serve(webDavListener); err != nil {
+ glog.Fatalf("WebDav Server Fail to serve: %v", err)
+ }
+ }
+
+ return true
+
+}