aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
committershibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
commit7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch)
tree5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/command
parent29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff)
parent53c3aad87528d57343afc5fdb3fb5107544af0fc (diff)
downloadseaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz
seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go4
-rw-r--r--weed/command/benchmark.go60
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/compact.go3
-rw-r--r--weed/command/download.go16
-rw-r--r--weed/command/export.go42
-rw-r--r--weed/command/filer.go44
-rw-r--r--weed/command/filer_copy.go25
-rw-r--r--weed/command/filer_sync.go337
-rw-r--r--weed/command/fix.go9
-rw-r--r--weed/command/master.go65
-rw-r--r--weed/command/mount.go34
-rw-r--r--weed/command/mount_std.go35
-rw-r--r--weed/command/s3.go41
-rw-r--r--weed/command/scaffold.go18
-rw-r--r--weed/command/server.go34
-rw-r--r--weed/command/upload.go2
-rw-r--r--weed/command/volume.go108
-rw-r--r--weed/command/watch.go60
-rw-r--r--weed/command/webdav.go2
20 files changed, 731 insertions, 209 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 615be80cf..950cbf68e 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
}
- v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
+ v, err := storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
@@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool {
// remove the old data
v.Destroy()
// recreate an empty volume
- v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
+ v, err = storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index de44fac75..e241a904e 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -2,7 +2,6 @@ package command
import (
"bufio"
- "context"
"fmt"
"io"
"math"
@@ -19,7 +18,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
@@ -41,7 +39,6 @@ type BenchmarkOptions struct {
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
- grpcRead *bool
fsync *bool
}
@@ -67,7 +64,6 @@ func init() {
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
sharedBytes = make([]byte, 1024)
}
@@ -286,25 +282,20 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now()
var bytesRead int
var err error
- if *b.grpcRead {
- volumeServer, err := b.masterClient.LookupVolumeServer(fid)
- if err != nil {
- s.failed++
- println("!!!! ", fid, " location not found!!!!!")
- continue
- }
- bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
- } else {
- url, err := b.masterClient.LookupFileId(fid)
- if err != nil {
- s.failed++
- println("!!!! ", fid, " location not found!!!!!")
- continue
+ urls, err := b.masterClient.LookupFileId(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ var bytes []byte
+ for _, url := range urls {
+ bytes, _, err = util.Get(url)
+ if err == nil {
+ break
}
- var bytes []byte
- bytes, err = util.Get(url)
- bytesRead = len(bytes)
}
+ bytesRead = len(bytes)
if err == nil {
s.completed++
s.transferred += int64(bytesRead)
@@ -316,29 +307,6 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
}
-func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
- err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid})
- if err != nil {
- return err
- }
-
- for {
- resp, respErr := fileGetClient.Recv()
- if resp != nil {
- bytesRead += len(resp.Data)
- }
- if respErr != nil {
- if respErr == io.EOF {
- return nil
- }
- return respErr
- }
- }
- })
- return
-}
-
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
@@ -396,7 +364,7 @@ func readFileIds(fileName string, fileIdLineChan chan string) {
}
const (
- benchResolution = 10000 //0.1 microsecond
+ benchResolution = 10000 // 0.1 microsecond
benchBucket = 1000000000 / benchResolution
)
@@ -519,7 +487,7 @@ func (s *stats) printStats() {
fmt.Printf("\nConnection Times (ms)\n")
fmt.Printf(" min avg max std\n")
fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
- //printing percentiles
+ // printing percentiles
fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
percentiles := make([]int, len(percentages))
for i := 0; i < len(percentages); i++ {
diff --git a/weed/command/command.go b/weed/command/command.go
index 9a41a8a7c..0df22b575 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -16,6 +16,7 @@ var Commands = []*Command{
cmdExport,
cmdFiler,
cmdFilerReplicate,
+ cmdFilerSynchronize,
cmdFix,
cmdMaster,
cmdMount,
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 4e28aa725..6117cf9f3 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -40,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool {
preallocate := *compactVolumePreallocate * (1 << 20)
vid := needle.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
+ v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid,
storage.NeedleMapInMemory, nil, nil, preallocate, 0)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
diff --git a/weed/command/download.go b/weed/command/download.go
index be0eb47e5..f7588fbf0 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net/http"
"os"
"path"
"strings"
@@ -43,7 +44,7 @@ var cmdDownload = &Command{
func runDownload(cmd *Command, args []string) bool {
for _, fid := range args {
- if e := downloadToFile(*d.server, fid, *d.dir); e != nil {
+ if e := downloadToFile(*d.server, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}
@@ -59,7 +60,7 @@ func downloadToFile(server, fileId, saveDir string) error {
if err != nil {
return err
}
- defer rc.Close()
+ defer util.CloseResponse(rc)
if filename == "" {
filename = fileId
}
@@ -71,12 +72,11 @@ func downloadToFile(server, fileId, saveDir string) error {
}
f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
- io.Copy(ioutil.Discard, rc)
return err
}
defer f.Close()
if isFileList {
- content, err := ioutil.ReadAll(rc)
+ content, err := ioutil.ReadAll(rc.Body)
if err != nil {
return err
}
@@ -95,7 +95,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
}
} else {
- if _, err = io.Copy(f, rc); err != nil {
+ if _, err = io.Copy(f, rc.Body); err != nil {
return err
}
@@ -108,12 +108,12 @@ func fetchContent(server string, fileId string) (filename string, content []byte
if lookupError != nil {
return "", nil, lookupError
}
- var rc io.ReadCloser
+ var rc *http.Response
if filename, _, rc, e = util.DownloadFile(fileUrl); e != nil {
return "", nil, e
}
- content, e = ioutil.ReadAll(rc)
- rc.Close()
+ defer util.CloseResponse(rc)
+ content, e = ioutil.ReadAll(rc.Body)
return
}
diff --git a/weed/command/export.go b/weed/command/export.go
index 8c32b3f4d..78d75ef52 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -19,10 +19,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
const (
- defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}`
+ defaultFnFormat = `{{.Id}}_{{.Name}}{{.Ext}}`
timeFormat = "2006-01-02T15:04:05"
)
@@ -55,7 +56,7 @@ func init() {
var (
output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
- format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}")
+ format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Id}} {{.Name}} {{.Ext}}")
newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone, e.g. 2006-01-02T15:04:05")
showDeleted = cmdExport.Flag.Bool("deleted", false, "export deleted files. only applies if -o is not specified")
limit = cmdExport.Flag.Int("limit", 0, "only show first n entries if specified")
@@ -69,21 +70,23 @@ var (
localLocation, _ = time.LoadLocation("Local")
)
-func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, deleted bool) {
+func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, deleted bool, offset int64, onDiskSize int64) {
key := needle.NewFileIdFromNeedle(vid, n).String()
- size := n.DataSize
+ size := int32(n.DataSize)
if version == needle.Version1 {
- size = n.Size
+ size = int32(n.Size)
}
- fmt.Printf("%s\t%s\t%d\t%t\t%s\t%s\t%s\t%t\n",
+ fmt.Printf("%s\t%s\t%d\t%t\t%s\t%s\t%s\t%t\t%d\t%d\n",
key,
n.Name,
size,
- n.IsGzipped(),
+ n.IsCompressed(),
n.Mime,
n.LastModifiedString(),
n.Ttl.String(),
deleted,
+ offset,
+ offset+onDiskSize,
)
}
@@ -108,9 +111,9 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset in
vid := scanner.vid
nv, ok := needleMap.Get(n.Id)
- glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v",
- n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped(), ok, nv)
- if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && nv.Offset.ToAcutalOffset() == offset {
+ glog.V(3).Infof("key %d offset %d size %d disk_size %d compressed %v ok %v nv %+v",
+ n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed(), ok, nv)
+ if *showDeleted && n.Size > 0 || ok && nv.Size.IsValid() && nv.Offset.ToAcutalOffset() == offset {
if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) {
glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d",
n.LastModified, newerThanUnix)
@@ -123,17 +126,17 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset in
if tarOutputFile != nil {
return writeFile(vid, n)
} else {
- printNeedle(vid, n, scanner.version, false)
+ printNeedle(vid, n, scanner.version, false, offset, n.DiskSize(scanner.version))
return nil
}
}
if !ok {
if *showDeleted && tarOutputFile == nil {
if n.DataSize > 0 {
- printNeedle(vid, n, scanner.version, true)
+ printNeedle(vid, n, scanner.version, true, offset, n.DiskSize(scanner.version))
} else {
n.Name = []byte("*tombstone")
- printNeedle(vid, n, scanner.version, true)
+ printNeedle(vid, n, scanner.version, true, offset, n.DiskSize(scanner.version))
}
}
glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size)
@@ -197,7 +200,7 @@ func runExport(cmd *Command, args []string) bool {
needleMap := needle_map.NewMemDb()
defer needleMap.Close()
- if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
+ if err := needleMap.LoadFromIdx(path.Join(util.ResolvePath(*export.dir), fileName+".idx")); err != nil {
glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
@@ -207,10 +210,10 @@ func runExport(cmd *Command, args []string) bool {
}
if tarOutputFile == nil {
- fmt.Printf("key\tname\tsize\tgzip\tmime\tmodified\tttl\tdeleted\n")
+ fmt.Printf("key\tname\tsize\tgzip\tmime\tmodified\tttl\tdeleted\tstart\tstop\n")
}
- err = storage.ScanVolumeFile(*export.dir, *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner)
+ err = storage.ScanVolumeFile(util.ResolvePath(*export.dir), *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner)
if err != nil && err != io.EOF {
glog.Fatalf("Export Volume File [ERROR] %s\n", err)
}
@@ -242,8 +245,11 @@ func writeFile(vid needle.VolumeId, n *needle.Needle) (err error) {
fileName := fileNameTemplateBuffer.String()
- if n.IsGzipped() && path.Ext(fileName) != ".gz" {
- fileName = fileName + ".gz"
+ if n.IsCompressed() {
+ if util.IsGzippedContent(n.Data) && path.Ext(fileName) != ".gz" {
+ fileName = fileName + ".gz"
+ }
+ // TODO other compression method
}
tarHeader.Name, tarHeader.Size = fileName, int64(len(n.Data))
diff --git a/weed/command/filer.go b/weed/command/filer.go
index e258b695d..1ea334e61 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -1,6 +1,7 @@
package command
import (
+ "fmt"
"net/http"
"strconv"
"strings"
@@ -13,11 +14,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
- f FilerOptions
+ f FilerOptions
+ filerStartS3 *bool
+ filerS3Options S3Options
)
type FilerOptions struct {
@@ -32,9 +36,12 @@ type FilerOptions struct {
maxMB *int
dirListingLimit *int
dataCenter *string
+ rack *string
enableNotification *bool
disableHttp *bool
cipher *bool
+ peers *string
+ metricsHttpPort *int
// default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
@@ -48,13 +55,24 @@ func init() {
f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
- f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
+ f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
+ f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack")
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
+ f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
+ f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
+
+ // start s3 on filer
+ filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
+ filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port")
+ filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
+ filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
+ filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
+ filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
}
var cmdFiler = &Command{
@@ -82,6 +100,17 @@ func runFiler(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
+ go stats_collect.StartMetricsServer(*f.metricsHttpPort)
+
+ if *filerStartS3 {
+ filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port)
+ filerS3Options.filer = &filerAddress
+ go func() {
+ time.Sleep(2 * time.Second)
+ filerS3Options.startS3Server()
+ }()
+ }
+
f.startFiler()
return true
@@ -98,7 +127,12 @@ func (fo *FilerOptions) startFiler() {
defaultLevelDbDirectory := "./filerldb2"
if fo.defaultLevelDbDirectory != nil {
- defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerldb2"
+ defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
+ }
+
+ var peers []string
+ if *fo.peers != "" {
+ peers = strings.Split(*fo.peers, ",")
}
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
@@ -109,11 +143,13 @@ func (fo *FilerOptions) startFiler() {
MaxMB: *fo.maxMB,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
+ Rack: *fo.rack,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
Host: *fo.ip,
Port: uint32(*fo.port),
Cipher: *fo.cipher,
+ Filers: peers,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
@@ -144,7 +180,7 @@ func (fo *FilerOptions) startFiler() {
// starting grpc server
grpcPort := *fo.port + 10000
- grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0)
+ grpcL, err := util.NewListener(*fo.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 2d6ba94d6..2295faa8a 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -72,7 +72,7 @@ var cmdCopy = &Command{
If "maxMB" is set to a positive number, files larger than it would be split into chunks.
- `,
+`,
}
func runCopy(cmd *Command, args []string) bool {
@@ -111,11 +111,23 @@ func runCopy(cmd *Command, args []string) bool {
filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
+ masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
if err != nil {
fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
return false
}
+ if strings.HasPrefix(urlPath, dirBuckets+"/") {
+ restPath := urlPath[len(dirBuckets)+1:]
+ if strings.Index(restPath, "/") > 0 {
+ expectedBucket := restPath[:strings.Index(restPath, "/")]
+ if *copy.collection == "" {
+ *copy.collection = expectedBucket
+ } else {
+ fmt.Printf("destination %s uses collection \"%s\": unexpected collection \"%v\"\n", urlPath, expectedBucket, *copy.collection)
+ return true
+ }
+ }
+ }
if *copy.collection == "" {
*copy.collection = collection
}
@@ -170,13 +182,14 @@ func runCopy(cmd *Command, args []string) bool {
return true
}
-func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) {
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
}
masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
+ dirBuckets = resp.DirBuckets
cipher = resp.Cipher
return nil
})
@@ -206,7 +219,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
fileCopyTaskChan <- FileCopyTask{
sourceLocation: fileOrDir,
- destinationUrlPath: destPath,
+ destinationUrlPath: destPath+fi.Name(),
fileSize: fi.Size(),
fileMode: fi.Mode(),
uid: uid,
@@ -298,7 +311,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
- ParentPath: task.destinationUrlPath,
+ Path: task.destinationUrlPath,
}
assignResult, assignError = client.AssignVolume(context.Background(), request)
@@ -392,7 +405,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
- ParentPath: task.destinationUrlPath,
+ Path: task.destinationUrlPath+fileName,
}
assignResult, assignError = client.AssignVolume(context.Background(), request)
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
new file mode 100644
index 000000000..af0a624b1
--- /dev/null
+++ b/weed/command/filer_sync.go
@@ -0,0 +1,337 @@
+package command
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+ "google.golang.org/grpc"
+ "io"
+ "strings"
+ "time"
+)
+
+type SyncOptions struct {
+ isActivePassive *bool
+ filerA *string
+ filerB *string
+ aPath *string
+ bPath *string
+ aReplication *string
+ bReplication *string
+ aCollection *string
+ bCollection *string
+ aTtlSec *int
+ bTtlSec *int
+ aDebug *bool
+ bDebug *bool
+}
+
+var (
+ syncOptions SyncOptions
+ syncCpuProfile *string
+ syncMemProfile *string
+)
+
+func init() {
+ cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
+ syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true")
+ syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
+ syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
+ syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
+ syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
+ syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
+ syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
+ syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
+ syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
+ syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
+ syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
+ syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
+ syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
+ syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
+ syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
+}
+
+var cmdFilerSynchronize = &Command{
+ UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
+ Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters",
+ Long: `continuously synchronize file changes between two active-active or active-passive filers
+
+ filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
+ and write to the other destination. Different from filer.replicate:
+
+ * filer.sync only works between two filers.
+ * filer.sync does not need any special message queue setup.
+ * filer.sync supports both active-active and active-passive modes.
+
+ If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
+ A fresh sync will start from the earliest metadata logs.
+
+`,
+}
+
+func runFilerSynchronize(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
+
+ go func() {
+ for {
+ err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB,
+ *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug)
+ if err != nil {
+ glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+ }()
+
+ if !*syncOptions.isActivePassive {
+ go func() {
+ for {
+ err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA,
+ *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug)
+ if err != nil {
+ glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
+ time.Sleep(2147 * time.Millisecond)
+ }
+ }
+ }()
+ }
+
+ select {}
+
+ return true
+}
+
+func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string,
+ replicationStr, collection string, ttlSec int, debug bool) error {
+
+ // read source filer signature
+ sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
+ if sourceErr != nil {
+ return sourceErr
+ }
+ // read target filer signature
+ targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
+ if targetErr != nil {
+ return targetErr
+ }
+
+ // if first time, start from now
+ // if has previously synced, resume from that point of time
+ sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
+ if err != nil {
+ return err
+ }
+
+ glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
+
+ // create filer sink
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath)
+ filerSink := &filersink.FilerSink{}
+ filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption)
+ filerSink.SetSourceFiler(filerSource)
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ var sourceOldKey, sourceNewKey util.FullPath
+ if message.OldEntry != nil {
+ sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
+ }
+ if message.NewEntry != nil {
+ sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
+ }
+
+ for _, sig := range message.Signatures {
+ if sig == targetFilerSignature && targetFilerSignature != 0 {
+ fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
+ return nil
+ }
+ }
+ if debug {
+ fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
+ }
+
+ if !strings.HasPrefix(resp.Directory, sourcePath) {
+ return nil
+ }
+
+ // handle deletions
+ if message.OldEntry != nil && message.NewEntry == nil {
+ if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ return nil
+ }
+ key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+
+ // handle new entries
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ return nil
+ }
+ key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
+ return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ }
+
+ // this is something special?
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+
+ // handle updates
+ if strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ // old key is in the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is also in the watched directory
+ oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
+ foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
+ if foundExisting {
+ return err
+ }
+
+ // not able to find old entry
+ if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
+ return fmt.Errorf("delete old entry %v: %v", oldKey, err)
+ }
+
+ // create the new entry
+ newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
+ return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
+
+ } else {
+ // new key is outside of the watched directory
+ key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+ } else {
+ // old key is outside of the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is in the watched directory
+ key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
+ return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ } else {
+ // new key is also outside of the watched directory
+ // skip
+ }
+ }
+
+ return nil
+ }
+
+ return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "syncTo_" + targetFiler,
+ PathPrefix: sourcePath,
+ SinceNs: sourceFilerOffsetTsNs,
+ Signature: targetFilerSignature,
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return err
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
+ return err
+ }
+ }
+
+ }
+
+ })
+
+}
+
+const (
+ SyncKeyPrefix = "sync."
+)
+
+func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
+
+ readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ syncKey := []byte(SyncKeyPrefix + "____")
+ util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+
+ resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
+ if err != nil {
+ return err
+ }
+
+ if len(resp.Error) != 0 {
+ return errors.New(resp.Error)
+ }
+ if len(resp.Value) < 8 {
+ return nil
+ }
+
+ lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
+
+ return nil
+ })
+
+ return
+
+}
+
+func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
+ return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ syncKey := []byte(SyncKeyPrefix + "____")
+ util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+
+ valueBuf := make([]byte, 8)
+ util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
+
+ resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
+ Key: syncKey,
+ Value: valueBuf,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(resp.Error) != 0 {
+ return errors.New(resp.Error)
+ }
+
+ return nil
+
+ })
+
+}
diff --git a/weed/command/fix.go b/weed/command/fix.go
index 90d1c4893..ae9a051b8 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -46,8 +47,8 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
}
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
- glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
- if n.Size > 0 && n.Size != types.TombstoneFileSize {
+ glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
+ if n.Size.IsValid() {
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
@@ -67,7 +68,7 @@ func runFix(cmd *Command, args []string) bool {
if *fixVolumeCollection != "" {
baseFileName = *fixVolumeCollection + "_" + baseFileName
}
- indexFileName := path.Join(*fixVolumePath, baseFileName+".idx")
+ indexFileName := path.Join(util.ResolvePath(*fixVolumePath), baseFileName+".idx")
nm := needle_map.NewMemDb()
defer nm.Close()
@@ -77,7 +78,7 @@ func runFix(cmd *Command, args []string) bool {
nm: nm,
}
- if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
+ if err := storage.ScanVolumeFile(util.ResolvePath(*fixVolumePath), *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
glog.Fatalf("scan .dat File: %v", err)
os.Remove(indexFileName)
}
diff --git a/weed/command/master.go b/weed/command/master.go
index 21c759f4e..c03da7f5d 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,16 +1,18 @@
package command
import (
+ "github.com/chrislusf/raft/protobuf"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc/reflection"
"net/http"
"os"
"runtime"
+ "sort"
"strconv"
"strings"
+ "time"
- "github.com/chrislusf/raft/protobuf"
"github.com/chrislusf/seaweedfs/weed/util/grace"
- "github.com/gorilla/mux"
- "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -26,13 +28,13 @@ var (
)
type MasterOptions struct {
- port *int
- ip *string
- ipBind *string
- metaFolder *string
- peers *string
- volumeSizeLimitMB *uint
- volumePreallocate *bool
+ port *int
+ ip *string
+ ipBind *string
+ metaFolder *string
+ peers *string
+ volumeSizeLimitMB *uint
+ volumePreallocate *bool
// pulseSeconds *int
defaultReplication *string
garbageThreshold *float64
@@ -40,6 +42,7 @@ type MasterOptions struct {
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
+ raftResumeState *bool
}
func init() {
@@ -56,8 +59,9 @@ func init() {
m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
- m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address")
+ m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
+ m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
}
var cmdMaster = &Command{
@@ -85,7 +89,11 @@ func runMaster(cmd *Command, args []string) bool {
runtime.GOMAXPROCS(runtime.NumCPU())
grace.SetupProfiling(*masterCpuProfile, *masterMemProfile)
- if err := util.TestFolderWritable(*m.metaFolder); err != nil {
+ parent, _ := util.FullPath(*m.metaFolder).DirAndName()
+ if util.FileExists(string(parent)) && !util.FileExists(*m.metaFolder) {
+ os.MkdirAll(*m.metaFolder, 0755)
+ }
+ if err := util.TestFolderWritable(util.ResolvePath(*m.metaFolder)); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
}
@@ -117,10 +125,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("Master startup error: %v", e)
}
// start raftServer
- raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
- peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, 5)
+ raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
+ peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, *masterOption.raftResumeState)
if raftServer == nil {
- glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
@@ -138,6 +146,15 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
go grpcS.Serve(grpcL)
+ go func() {
+ time.Sleep(1500 * time.Millisecond)
+ if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
+ if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
+ raftServer.DoJoinCommand()
+ }
+ }
+ }()
+
go ms.MasterClient.KeepConnectedToMaster()
// start http server
@@ -171,13 +188,21 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
return
}
+func isTheFirstOne(self string, peers []string) bool {
+ sort.Strings(peers)
+ if len(peers) <= 0 {
+ return true
+ }
+ return self == peers[0]
+}
+
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
return &weed_server.MasterOption{
- Host: *m.ip,
- Port: *m.port,
- MetaFolder: *m.metaFolder,
- VolumeSizeLimitMB: *m.volumeSizeLimitMB,
- VolumePreallocate: *m.volumePreallocate,
+ Host: *m.ip,
+ Port: *m.port,
+ MetaFolder: *m.metaFolder,
+ VolumeSizeLimitMB: *m.volumeSizeLimitMB,
+ VolumePreallocate: *m.volumePreallocate,
// PulseSeconds: *m.pulseSeconds,
DefaultReplicaPlacement: *m.defaultReplication,
GarbageThreshold: *m.garbageThreshold,
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 21c8e7744..7fdb21254 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -2,13 +2,14 @@ package command
import (
"os"
+ "time"
)
type MountOptions struct {
filer *string
filerMountRootPath *string
dir *string
- dirListCacheLimit *int64
+ dirAutoCreate *bool
collection *string
replication *string
ttlSec *int
@@ -20,13 +21,15 @@ type MountOptions struct {
umaskString *string
nonempty *bool
outsideContainerClusterMode *bool
- asyncMetaDataCaching *bool
+ uidMap *string
+ gidMap *string
}
var (
- mountOptions MountOptions
- mountCpuProfile *string
- mountMemProfile *string
+ mountOptions MountOptions
+ mountCpuProfile *string
+ mountMemProfile *string
+ mountReadRetryTime *time.Duration
)
func init() {
@@ -34,21 +37,24 @@ func init() {
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
- mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing")
+ mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
- mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files")
- mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
- mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB (0 will disable cache)")
+ mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
+ mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
+ mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
+ mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access volume servers with publicUrl")
+ mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>")
+ mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>")
+
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
- mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
- mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.")
+ mountReadRetryTime = cmdMount.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time")
}
var cmdMount = &Command{
@@ -66,11 +72,5 @@ var cmdMount = &Command{
On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
- If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose,
- the volume servers are not accessible by their own ip addresses.
- In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming:
- * All volume server containers are accessible through the same hostname or IP address as the filer.
- * All volume server container ports are open external to the cluster.
-
`,
}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index c95626651..20d08314c 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -5,6 +5,8 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"os"
"os/user"
"path"
@@ -13,6 +15,9 @@ import (
"strings"
"time"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -20,13 +25,15 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
func runMount(cmd *Command, args []string) bool {
grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
+ if *mountReadRetryTime < time.Second {
+ *mountReadRetryTime = time.Second
+ }
+ filer.ReadWaitTime = *mountReadRetryTime
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
if umaskErr != nil {
@@ -68,7 +75,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
filerMountRootPath := *option.filerMountRootPath
- dir := *option.dir
+ dir := util.ResolvePath(*option.dir)
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
util.LoadConfiguration("security", false)
@@ -85,15 +92,21 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
fuse.Unmount(dir)
- uid, gid := uint32(0), uint32(0)
-
// detect mount folder mode
- mountMode := os.ModeDir | 0755
+ if *option.dirAutoCreate {
+ os.MkdirAll(dir, os.FileMode(0777)&^umask)
+ }
fileInfo, err := os.Stat(dir)
+
+ uid, gid := uint32(0), uint32(0)
+ mountMode := os.ModeDir | 0777
if err == nil {
mountMode = os.ModeDir | fileInfo.Mode()
uid, gid = util.GetFileUidGid(fileInfo)
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode())
+ } else {
+ fmt.Printf("can not stat %s\n", dir)
+ return false
}
if uid == 0 {
@@ -108,6 +121,13 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
}
+ // mapping uid, gid
+ uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
+ if err != nil {
+ fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
+ return false
+ }
+
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
@@ -158,7 +178,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
- DirListCacheLimit: *option.dirListCacheLimit,
EntryCacheTtl: 3 * time.Second,
MountUid: uid,
MountGid: gid,
@@ -167,8 +186,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
MountMtime: time.Now(),
Umask: umask,
OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
- AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching,
Cipher: cipher,
+ UidGidMapper: uidGidMapper,
})
// mount
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 7ebd4fab0..ed5bb0b80 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -22,22 +23,24 @@ var (
)
type S3Options struct {
- filer *string
- port *int
- config *string
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ metricsHttpPort *int
}
func init() {
cmdS3.Run = runS3 // break init cycle
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
- s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
+ s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
}
var cmdS3 = &Command{
@@ -51,7 +54,13 @@ var cmdS3 = &Command{
{
"identities": [
{
- "name": "some_name",
+ "name": "anonymous",
+ "actions": [
+ "Read"
+ ]
+ },
+ {
+ "name": "some_admin_user",
"credentials": [
{
"accessKey": "some_access_key1",
@@ -61,6 +70,8 @@ var cmdS3 = &Command{
"actions": [
"Admin",
"Read",
+ "List",
+ "Tagging",
"Write"
]
},
@@ -86,6 +97,8 @@ var cmdS3 = &Command{
],
"actions": [
"Read",
+ "List",
+ "Tagging",
"Write"
]
},
@@ -99,6 +112,8 @@ var cmdS3 = &Command{
],
"actions": [
"Read:bucket1",
+ "List:bucket1",
+ "Tagging:bucket1",
"Write:bucket1"
]
}
@@ -112,6 +127,8 @@ func runS3(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
+ go stats_collect.StartMetricsServer(*s3StandaloneOptions.metricsHttpPort)
+
return s3StandaloneOptions.startS3Server()
}
@@ -128,6 +145,10 @@ func (s3opt *S3Options) startS3Server() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ // metrics read from the filer
+ var metricsAddress string
+ var metricsIntervalSec int
+
for {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
@@ -135,6 +156,7 @@ func (s3opt *S3Options) startS3Server() bool {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
}
filerBucketsPath = resp.DirBuckets
+ metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
return nil
})
@@ -147,10 +169,13 @@ func (s3opt *S3Options) startS3Server() bool {
}
}
+ go stats_collect.LoopPushingMetric("s3", stats_collect.SourceName(uint32(*s3opt.port)), metricsAddress, metricsIntervalSec)
+
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: *s3opt.filer,
+ Port: *s3opt.port,
FilerGrpcAddress: filerGrpcAddress,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index b199f2d2d..c36e4a25f 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -140,6 +140,8 @@ keyspace="seaweedfs"
hosts=[
"localhost:9042",
]
+username=""
+password=""
[redis2]
enabled = false
@@ -173,6 +175,20 @@ enabled = false
uri = "mongodb://localhost:27017"
option_pool_size = 0
database = "seaweedfs"
+
+[elastic7]
+enabled = false
+servers = [
+ "http://localhost1:9200",
+ "http://localhost2:9200",
+ "http://localhost3:9200",
+]
+username = ""
+password = ""
+sniff_enabled = false
+healthcheck_enabled = false
+# increase the value is recommend, be sure the value in Elastic is greater or equal here
+index.max_result_window = 10000
`
NOTIFICATION_TOML_EXAMPLE = `
@@ -377,7 +393,7 @@ default = "localhost:8888" # used by maintenance scripts if the scripts needs
[master.sequencer]
-type = "memory" # Choose [memory|etcd] type for storing the file id sequence
+type = "raft" # Choose [raft|etcd] type for storing the file id sequence
# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
# example : http://127.0.0.1:2379,http://127.0.0.1:2389
sequencer_etcd_urls = "http://127.0.0.1:2379"
diff --git a/weed/command/server.go b/weed/command/server.go
index 0af583a7f..6a78fb3f4 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,6 +2,7 @@ package command
import (
"fmt"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"os"
"runtime"
"runtime/pprof"
@@ -30,7 +31,7 @@ func init() {
}
var cmdServer = &Command{
- UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name",
+ UsageLine: "server -dir=/tmp -volume.max=5 -ip=server_name",
Short: "start a master server, a volume server, and optionally a filer and a S3 gateway",
Long: `start both a volume server to provide storage spaces
and a master server to provide volume=>location mapping service and sequence number of file ids
@@ -54,13 +55,14 @@ var (
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.")
- volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly")
+ volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.")
+ volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly.")
+ serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
- isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
- isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
+ isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
+ isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
+ isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string
@@ -79,15 +81,17 @@ func init() {
masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
+ masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
- filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
+ filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
+ filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@@ -95,12 +99,13 @@ func init() {
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
- serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
- serverOptions.v.pprof = &False
+ serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
+ serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
- s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
@@ -134,6 +139,7 @@ func runServer(cmd *Command, args []string) bool {
peers := strings.Join(peerList, ",")
masterOptions.peers = &peers
+ // ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
filerOptions.masters = &peers
@@ -153,6 +159,7 @@ func runServer(cmd *Command, args []string) bool {
masterOptions.whiteList = serverWhiteListOption
filerOptions.dataCenter = serverDataCenter
+ filerOptions.rack = serverRack
filerOptions.disableHttp = serverDisableHttp
masterOptions.disableHttp = serverDisableHttp
@@ -160,11 +167,8 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
- if *filerOptions.defaultReplicaPlacement == "" {
- *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
- }
-
runtime.GOMAXPROCS(runtime.NumCPU())
+ go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
folders := strings.Split(*volumeDataFolders, ",")
@@ -175,7 +179,7 @@ func runServer(cmd *Command, args []string) bool {
if *masterOptions.metaFolder == "" {
*masterOptions.metaFolder = folders[0]
}
- if err := util.TestFolderWritable(*masterOptions.metaFolder); err != nil {
+ if err := util.TestFolderWritable(util.ResolvePath(*masterOptions.metaFolder)); err != nil {
glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterOptions.metaFolder, err)
}
filerOptions.defaultLevelDbDirectory = masterOptions.metaFolder
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 358897aee..45b15535b 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -69,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool {
if *upload.dir == "" {
return false
}
- filepath.Walk(*upload.dir, func(path string, info os.FileInfo, err error) error {
+ filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error {
if err == nil {
if !info.IsDir() {
if *upload.include != "" {
diff --git a/weed/command/volume.go b/weed/command/volume.go
index d0fdd2ed1..d73c24ed1 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -25,6 +25,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -53,8 +54,10 @@ type VolumeServerOptions struct {
memProfile *string
compactionMBPerSecond *int
fileSizeLimitMB *int
- minFreeSpacePercent []float32
+ minFreeSpacePercents []float32
pprof *bool
+ preStopSeconds *int
+ metricsHttpPort *int
// pulseSeconds *int
}
@@ -66,6 +69,7 @@ func init() {
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
+ v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
@@ -76,8 +80,9 @@ func init() {
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
- v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
+ v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
}
var cmdVolume = &Command{
@@ -90,9 +95,9 @@ var cmdVolume = &Command{
var (
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.")
+ maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.")
volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
- minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent ", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly")
+ minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly.")
)
func runVolume(cmd *Command, args []string) bool {
@@ -107,6 +112,8 @@ func runVolume(cmd *Command, args []string) bool {
grace.SetupProfiling(*v.cpuProfile, *v.memProfile)
}
+ go stats_collect.StartMetricsServer(*v.metricsHttpPort)
+
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *minFreeSpacePercent)
return true
@@ -116,6 +123,13 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",")
+ for _, folder := range v.folders {
+ if err := util.TestFolderWritable(util.ResolvePath(folder)); err != nil {
+ glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
+ }
+ }
+
+ // set max
maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings {
if max, e := strconv.Atoi(maxString); e == nil {
@@ -124,24 +138,32 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
glog.Fatalf("The max specified in -max not a valid number %s", maxString)
}
}
+ if len(v.folderMaxLimits) == 1 && len(v.folders) > 1 {
+ for i := 0; i < len(v.folders)-1; i++ {
+ v.folderMaxLimits = append(v.folderMaxLimits, v.folderMaxLimits[0])
+ }
+ }
if len(v.folders) != len(v.folderMaxLimits) {
glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits))
}
+
+ // set minFreeSpacePercent
minFreeSpacePercentStrings := strings.Split(minFreeSpacePercent, ",")
for _, freeString := range minFreeSpacePercentStrings {
-
if value, e := strconv.ParseFloat(freeString, 32); e == nil {
- v.minFreeSpacePercent = append(v.minFreeSpacePercent, float32(value))
+ v.minFreeSpacePercents = append(v.minFreeSpacePercents, float32(value))
} else {
glog.Fatalf("The value specified in -minFreeSpacePercent not a valid value %s", freeString)
}
}
-
- for _, folder := range v.folders {
- if err := util.TestFolderWritable(folder); err != nil {
- glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
+ if len(v.minFreeSpacePercents) == 1 && len(v.folders) > 1 {
+ for i := 0; i < len(v.folders)-1; i++ {
+ v.minFreeSpacePercents = append(v.minFreeSpacePercents, v.minFreeSpacePercents[0])
}
}
+ if len(v.folders) != len(v.minFreeSpacePercents) {
+ glog.Fatalf("%d directories by -dir, but only %d minFreeSpacePercent is set by -minFreeSpacePercent", len(v.folders), len(v.minFreeSpacePercents))
+ }
// security related white list configuration
if volumeWhiteListOption != "" {
@@ -188,7 +210,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
- v.folders, v.folderMaxLimits, v.minFreeSpacePercent,
+ v.folders, v.folderMaxLimits, v.minFreeSpacePercents,
volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList,
@@ -196,7 +218,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
)
-
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
@@ -212,47 +233,48 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux)
- stopChain := make(chan struct{})
+ stopChan := make(chan bool)
grace.OnInterrupt(func() {
fmt.Println("volume server has be killed")
- var startTime time.Time
-
- // firstly, stop the public http service to prevent from receiving new user request
- if nil != publicHttpDown {
- startTime = time.Now()
- if err := publicHttpDown.Stop(); err != nil {
- glog.Warningf("stop the public http server failed, %v", err)
- }
- delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("stop public http server, elapsed %dms", delta)
- }
- startTime = time.Now()
- if err := clusterHttpServer.Stop(); err != nil {
- glog.Warningf("stop the cluster http server failed, %v", err)
+ // Stop heartbeats
+ if !volumeServer.StopHeartbeat() {
+ glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds)
+ time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
}
- delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
- startTime = time.Now()
- grpcS.GracefulStop()
- delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
+ shutdown(publicHttpDown, clusterHttpServer, grpcS, volumeServer)
+ stopChan <- true
+ })
- startTime = time.Now()
- volumeServer.Shutdown()
- delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
+ select {
+ case <-stopChan:
+ }
- pprof.StopCPUProfile()
+}
- close(stopChain) // notify exit
- })
+func shutdown(publicHttpDown httpdown.Server, clusterHttpServer httpdown.Server, grpcS *grpc.Server, volumeServer *weed_server.VolumeServer) {
- select {
- case <-stopChain:
+ // firstly, stop the public http service to prevent from receiving new user request
+ if nil != publicHttpDown {
+ glog.V(0).Infof("stop public http server ... ")
+ if err := publicHttpDown.Stop(); err != nil {
+ glog.Warningf("stop the public http server failed, %v", err)
+ }
+ }
+
+ glog.V(0).Infof("graceful stop cluster http server ... ")
+ if err := clusterHttpServer.Stop(); err != nil {
+ glog.Warningf("stop the cluster http server failed, %v", err)
}
- glog.Warningf("the volume server exit.")
+
+ glog.V(0).Infof("graceful stop gRPC ...")
+ grpcS.GracefulStop()
+
+ volumeServer.Shutdown()
+
+ pprof.StopCPUProfile()
+
}
// check whether configure the public port
diff --git a/weed/command/watch.go b/weed/command/watch.go
index b46707a62..fd7dd6fb2 100644
--- a/weed/command/watch.go
+++ b/weed/command/watch.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"io"
+ "path/filepath"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -17,7 +19,7 @@ func init() {
}
var cmdWatch = &Command{
- UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]",
+ UsageLine: "watch [-filer=localhost:8888] [-target=/]",
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
@@ -25,18 +27,61 @@ var cmdWatch = &Command{
}
var (
- watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
- watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
- watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
)
func runWatch(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ var filterFunc func(dir, fname string) bool
+ if *watchPattern != "" {
+ if strings.Contains(*watchPattern, "/") {
+ println("watch path pattern", *watchPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ } else {
+ println("watch file pattern", *watchPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*watchPattern, fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ }
+ }
+
+ shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
+ if filterFunc == nil {
+ return true
+ }
+ if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
+ return false
+ }
+ if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
+ return true
+ }
+ if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
+ return true
+ }
+ return false
+ }
+
watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "watch",
PathPrefix: *watchTarget,
SinceNs: time.Now().Add(-*watchStart).UnixNano(),
@@ -53,7 +98,10 @@ func runWatch(cmd *Command, args []string) bool {
if listenErr != nil {
return listenErr
}
- fmt.Printf("events: %+v\n", resp.EventNotification)
+ if !shouldPrint(resp) {
+ continue
+ }
+ fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
}
})
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index b9676c909..dc84b1fd0 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -110,7 +110,7 @@ func (wo *WebDavOption) startWebDav() bool {
Uid: uid,
Gid: gid,
Cipher: cipher,
- CacheDir: *wo.cacheDir,
+ CacheDir: util.ResolvePath(*wo.cacheDir),
CacheSizeMB: *wo.cacheSizeMB,
})
if webdavServer_err != nil {