aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorKonstantin Lebedev <lebedev_k@tochka.com>2021-04-06 13:50:33 +0500
committerKonstantin Lebedev <lebedev_k@tochka.com>2021-04-06 13:50:33 +0500
commit011e6e90ee8a3aeff6f845fec90331ad4714b514 (patch)
treeb661a90a1cc8c77b2085f120420b0bdd537bcf0d /weed/command
parented79baa30fe5687a35a9a61e2dcf3b4750064d36 (diff)
parent100ed773870b8826352f25e0cd72f60a591ecfa8 (diff)
downloadseaweedfs-011e6e90ee8a3aeff6f845fec90331ad4714b514.tar.xz
seaweedfs-011e6e90ee8a3aeff6f845fec90331ad4714b514.zip
Merge branch 'upstreamMaster' into iamapipr
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/benchmark.go27
-rw-r--r--weed/command/command.go2
-rw-r--r--weed/command/filer.go37
-rw-r--r--weed/command/filer_backup.go157
-rw-r--r--weed/command/filer_cat.go2
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/filer_meta_backup.go268
-rw-r--r--weed/command/filer_meta_tail.go8
-rw-r--r--weed/command/filer_replication.go29
-rw-r--r--weed/command/filer_sync.go203
-rw-r--r--weed/command/master.go7
-rw-r--r--weed/command/mount.go4
-rw-r--r--weed/command/mount_std.go24
-rw-r--r--weed/command/msg_broker.go2
-rw-r--r--weed/command/s3.go2
-rw-r--r--weed/command/scaffold.go37
-rw-r--r--weed/command/server.go36
-rw-r--r--weed/command/upload.go2
-rw-r--r--weed/command/volume.go86
-rw-r--r--weed/command/webdav.go2
20 files changed, 740 insertions, 197 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index c1bc80c42..4fedb55f1 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -41,6 +41,7 @@ type BenchmarkOptions struct {
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
fsync *bool
+ useTcp *bool
}
var (
@@ -67,6 +68,7 @@ func init() {
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
+ b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
sharedBytes = make([]byte, 1024)
}
@@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
+ volumeTcpClient := wdclient.NewVolumeTcpClient()
+
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
@@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
- if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
+ if *b.useTcp {
+ if uploadByTcp(volumeTcpClient, fp) {
+ fileIdLineChan <- fp.Fid
+ s.completed++
+ s.transferred += fileSize
+ } else {
+ s.failed++
+ }
+ } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@@ -293,7 +305,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
- bytes, _, err = util.FastGet(url)
+ bytes, _, err = util.Get(url)
if err == nil {
break
}
@@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b
}
}
+func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
+
+ err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
+ if err != nil {
+ glog.Errorf("upload chunk err: %v", err)
+ return false
+ }
+
+ return true
+}
+
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {
diff --git a/weed/command/command.go b/weed/command/command.go
index fea1dd9d3..ce754702f 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -15,7 +15,9 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
+ cmdFilerBackup,
cmdFilerCat,
+ cmdFilerMetaBackup,
cmdFilerMetaTail,
cmdFilerReplicate,
cmdFilerSynchronize,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index b256e385f..08385e62c 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -49,6 +49,7 @@ type FilerOptions struct {
metricsHttpPort *int
saveToFilerLimit *int
defaultLevelDbDirectory *string
+ concurrentUploadLimitMB *int
}
func init() {
@@ -56,12 +57,12 @@ func init() {
f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
- f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
- f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
+ f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack")
@@ -71,6 +72,7 @@ func init() {
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
+ f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -176,21 +178,22 @@ func (fo *FilerOptions) startFiler() {
}
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
- Masters: strings.Split(*fo.masters, ","),
- Collection: *fo.collection,
- DefaultReplication: *fo.defaultReplicaPlacement,
- DisableDirListing: *fo.disableDirListing,
- MaxMB: *fo.maxMB,
- DirListingLimit: *fo.dirListingLimit,
- DataCenter: *fo.dataCenter,
- Rack: *fo.rack,
- DefaultLevelDbDir: defaultLevelDbDirectory,
- DisableHttp: *fo.disableHttp,
- Host: *fo.ip,
- Port: uint32(*fo.port),
- Cipher: *fo.cipher,
- SaveToFilerLimit: *fo.saveToFilerLimit,
- Filers: peers,
+ Masters: strings.Split(*fo.masters, ","),
+ Collection: *fo.collection,
+ DefaultReplication: *fo.defaultReplicaPlacement,
+ DisableDirListing: *fo.disableDirListing,
+ MaxMB: *fo.maxMB,
+ DirListingLimit: *fo.dirListingLimit,
+ DataCenter: *fo.dataCenter,
+ Rack: *fo.rack,
+ DefaultLevelDbDir: defaultLevelDbDirectory,
+ DisableHttp: *fo.disableHttp,
+ Host: *fo.ip,
+ Port: uint32(*fo.port),
+ Cipher: *fo.cipher,
+ SaveToFilerLimit: int64(*fo.saveToFilerLimit),
+ Filers: peers,
+ ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
new file mode 100644
index 000000000..888b46fe7
--- /dev/null
+++ b/weed/command/filer_backup.go
@@ -0,0 +1,157 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "io"
+ "time"
+)
+
+type FilerBackupOptions struct {
+ isActivePassive *bool
+ filer *string
+ path *string
+ debug *bool
+ proxyByFiler *bool
+ timeAgo *time.Duration
+}
+
+var (
+ filerBackupOptions FilerBackupOptions
+)
+
+func init() {
+ cmdFilerBackup.Run = runFilerBackup // break init cycle
+ filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
+ filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
+ filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
+ filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
+ filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+}
+
+var cmdFilerBackup = &Command{
+ UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
+ Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
+ Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
+
+ filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
+ and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
+
+ If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
+ A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
+
+`,
+}
+
+func runFilerBackup(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("replication", true)
+
+ for {
+ err := doFilerBackup(grpcDialOption, &filerBackupOptions)
+ if err != nil {
+ glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+
+ return true
+}
+
+const (
+ BackupKeyPrefix = "backup."
+)
+
+func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
+
+ // find data sink
+ config := util.GetViper()
+ dataSink := findSink(config)
+ if dataSink == nil {
+ return fmt.Errorf("no data sink configured in replication.toml")
+ }
+
+ sourceFiler := *backupOption.filer
+ sourcePath := *backupOption.path
+ timeAgo := *backupOption.timeAgo
+ targetPath := dataSink.GetSinkToDirectory()
+ debug := *backupOption.debug
+
+ // get start time for the data sink
+ startFrom := time.Unix(0, 0)
+ sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
+ if timeAgo.Milliseconds() == 0 {
+ lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
+ if err != nil {
+ glog.V(0).Infof("starting from %v", startFrom)
+ } else {
+ startFrom = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resuming from %v", startFrom)
+ }
+ } else {
+ startFrom = time.Now().Add(-timeAgo)
+ glog.V(0).Infof("start time is set to %v", startFrom)
+ }
+
+ // create filer sink
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
+ dataSink.SetSourceFiler(filerSource)
+
+ processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
+
+ return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "backup_" + dataSink.GetName(),
+ PathPrefix: sourcePath,
+ SinceNs: startFrom.UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("processEventFn: %v", err)
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
+ return fmt.Errorf("setOffset: %v", err)
+ }
+ }
+
+ }
+
+ })
+
+}
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
index a46098b04..c4281feba 100644
--- a/weed/command/filer_cat.go
+++ b/weed/command/filer_cat.go
@@ -110,7 +110,7 @@ func runFilerCat(cmd *Command, args []string) bool {
filerCat.filerClient = client
- return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+ return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64, false)
})
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 42f5ec4c3..a36bb8cea 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -56,7 +56,7 @@ func init() {
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
- copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
+ copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
new file mode 100644
index 000000000..ba0b44659
--- /dev/null
+++ b/weed/command/filer_meta_backup.go
@@ -0,0 +1,268 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
+ "io"
+ "reflect"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ metaBackup FilerMetaBackupOptions
+)
+
+type FilerMetaBackupOptions struct {
+ grpcDialOption grpc.DialOption
+ filerAddress *string
+ filerDirectory *string
+ restart *bool
+ backupFilerConfig *string
+
+ store filer.FilerStore
+}
+
+func init() {
+ cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
+ metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
+ metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
+ metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
+}
+
+var cmdFilerMetaBackup = &Command{
+ UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
+ Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
+ Long: `continuously backup filer meta data changes.
+The backup writes to another filer store specified in a backup_filer.toml.
+
+ weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
+ weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
+
+ `,
+}
+
+func runFilerMetaBackup(cmd *Command, args []string) bool {
+
+ metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ // load backup_filer.toml
+ v := viper.New()
+ v.SetConfigFile(*metaBackup.backupFilerConfig)
+
+ if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
+ glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
+ " weed scaffold -config=%s -output=.\n\n\n",
+ *metaBackup.backupFilerConfig, "backup_filer", "filer")
+ }
+
+ if err := metaBackup.initStore(v); err != nil {
+ glog.V(0).Infof("init backup filer store: %v", err)
+ return true
+ }
+
+ missingPreviousBackup := false
+ _, err := metaBackup.getOffset()
+ if err != nil {
+ missingPreviousBackup = true
+ }
+
+ if *metaBackup.restart || missingPreviousBackup {
+ glog.V(0).Infof("traversing metadata tree...")
+ startTime := time.Now()
+ if err := metaBackup.traverseMetadata(); err != nil {
+ glog.Errorf("traverse meta data: %v", err)
+ return true
+ }
+ glog.V(0).Infof("metadata copied up to %v", startTime)
+ if err := metaBackup.setOffset(startTime); err != nil {
+ startTime = time.Now()
+ }
+ }
+
+ for {
+ err := metaBackup.streamMetadataBackup()
+ if err != nil {
+ glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+
+ return true
+}
+
+func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
+ // load configuration for default filer store
+ hasDefaultStoreConfigured := false
+ for _, store := range filer.Stores {
+ if v.GetBool(store.GetName() + ".enabled") {
+ store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
+ if err := store.Initialize(v, store.GetName()+"."); err != nil {
+ glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
+ }
+ glog.V(0).Infof("configured filer store to %s", store.GetName())
+ hasDefaultStoreConfigured = true
+ metaBackup.store = filer.NewFilerStoreWrapper(store)
+ break
+ }
+ }
+ if !hasDefaultStoreConfigured {
+ return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
+ }
+
+ return nil
+}
+
+func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
+ var saveErr error
+
+ traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
+
+ println("+", parentPath.Child(entry.Name))
+ if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
+ saveErr = fmt.Errorf("insert entry error: %v\n", err)
+ return
+ }
+
+ })
+
+ if traverseErr != nil {
+ return fmt.Errorf("traverse: %v", traverseErr)
+ }
+ return saveErr
+}
+
+var (
+ MetaBackupKey = []byte("metaBackup")
+)
+
+func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
+
+ startTime, err := metaBackup.getOffset()
+ if err != nil {
+ startTime = time.Now()
+ }
+ glog.V(0).Infof("streaming from %v", startTime)
+
+ store := metaBackup.store
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+
+ ctx := context.Background()
+ message := resp.EventNotification
+
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
+ return store.InsertEntry(ctx, entry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
+ return store.UpdateEntry(ctx, entry)
+ }
+ println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
+ return err
+ }
+ println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
+ }
+
+ return nil
+ }
+
+ tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "meta_backup",
+ PathPrefix: *metaBackup.filerDirectory,
+ SinceNs: startTime.UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ if err = eachEntryFunc(resp); err != nil {
+ return err
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
+ return err2
+ }
+ }
+
+ }
+
+ })
+ return tailErr
+}
+
+func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
+ value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
+ if err != nil {
+ return
+ }
+ tsNs := util.BytesToUint64(value)
+
+ return time.Unix(0, int64(tsNs)), nil
+}
+
+func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
+ valueBuf := make([]byte, 8)
+ util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
+
+ if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
+ return err
+ }
+ return nil
+}
+
+var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
+
+func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return fn(client)
+ })
+
+}
+
+func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index f055b19a8..8451ffd78 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -23,9 +23,9 @@ func init() {
}
var cmdFilerMetaTail = &Command{
- UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
- Short: "see recent changes on a filer",
- Long: `See recent changes on a filer.
+ UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
+ Short: "see continuous changes on a filer",
+ Long: `See continuous changes on a filer.
weed filer.meta.tail -timeAgo=30h | grep truncate
weed filer.meta.tail -timeAgo=30h | jq .
@@ -36,7 +36,7 @@ var cmdFilerMetaTail = &Command{
var (
tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
- tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index e8c06b208..885c95540 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
}
- var dataSink sink.ReplicationSink
- for _, sk := range sink.Sinks {
- if config.GetBool("sink." + sk.GetName() + ".enabled") {
- if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
- glog.Fatalf("Failed to initialize sink for %s: %+v",
- sk.GetName(), err)
- }
- glog.V(0).Infof("Configure sink to %s", sk.GetName())
- dataSink = sk
- break
- }
- }
+ dataSink := findSink(config)
if dataSink == nil {
println("no data sink configured in replication.toml:")
@@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
+func findSink(config *util.ViperProxy) sink.ReplicationSink {
+ var dataSink sink.ReplicationSink
+ for _, sk := range sink.Sinks {
+ if config.GetBool("sink." + sk.GetName() + ".enabled") {
+ if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
+ glog.Fatalf("Failed to initialize sink for %s: %+v",
+ sk.GetName(), err)
+ }
+ glog.V(0).Infof("Configure sink to %s", sk.GetName())
+ dataSink = sk
+ break
+ }
+ }
+ return dataSink
+}
+
func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index f8beb6fda..0f34e5701 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// if first time, start from now
// if has previously synced, resume from that point of time
- sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
+ sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
if err != nil {
return err
}
@@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
+ persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
+
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
-
- var sourceOldKey, sourceNewKey util.FullPath
- if message.OldEntry != nil {
- sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
- }
- if message.NewEntry != nil {
- sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
- }
-
for _, sig := range message.Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
return nil
}
}
- if debug {
- fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
- }
-
- if !strings.HasPrefix(resp.Directory, sourcePath) {
- return nil
- }
-
- // handle deletions
- if message.OldEntry != nil && message.NewEntry == nil {
- if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
- return nil
- }
- key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
- }
-
- // handle new entries
- if message.OldEntry == nil && message.NewEntry != nil {
- if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
- return nil
- }
- key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
- }
-
- // this is something special?
- if message.OldEntry == nil && message.NewEntry == nil {
- return nil
- }
-
- // handle updates
- if strings.HasPrefix(string(sourceOldKey), sourcePath) {
- // old key is in the watched directory
- if strings.HasPrefix(string(sourceNewKey), sourcePath) {
- // new key is also in the watched directory
- oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
- foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
- if foundExisting {
- return err
- }
-
- // not able to find old entry
- if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
- return fmt.Errorf("delete old entry %v: %v", oldKey, err)
- }
-
- // create the new entry
- newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
-
- } else {
- // new key is outside of the watched directory
- key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
- }
- } else {
- // old key is outside of the watched directory
- if strings.HasPrefix(string(sourceNewKey), sourcePath) {
- // new key is in the watched directory
- key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
- } else {
- // new key is also outside of the watched directory
- // skip
- }
- }
-
- return nil
+ return persistEventFn(resp)
}
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
- if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
+ if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
return err
}
}
@@ -290,11 +215,11 @@ const (
SyncKeyPrefix = "sync."
)
-func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
+func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- syncKey := []byte(SyncKeyPrefix + "____")
- util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+ syncKey := []byte(signaturePrefix + "____")
+ util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
if err != nil {
@@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature
}
-func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
+func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- syncKey := []byte(SyncKeyPrefix + "____")
- util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+ syncKey := []byte(signaturePrefix + "____")
+ util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
@@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur
})
}
+
+func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
+ // process function
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ var sourceOldKey, sourceNewKey util.FullPath
+ if message.OldEntry != nil {
+ sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
+ }
+ if message.NewEntry != nil {
+ sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
+ }
+
+ if debug {
+ glog.V(0).Infof("received %v", resp)
+ }
+
+ if !strings.HasPrefix(resp.Directory, sourcePath) {
+ return nil
+ }
+
+ // handle deletions
+ if message.OldEntry != nil && message.NewEntry == nil {
+ if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ return nil
+ }
+ key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+
+ // handle new entries
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ return nil
+ }
+ key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ }
+
+ // this is something special?
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+
+ // handle updates
+ if strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ // old key is in the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is also in the watched directory
+ if !dataSink.IsIncremental() {
+ oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
+ foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
+ if foundExisting {
+ return err
+ }
+
+ // not able to find old entry
+ if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
+ return fmt.Errorf("delete old entry %v: %v", oldKey, err)
+ }
+ }
+ // create the new entry
+ newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
+
+ } else {
+ // new key is outside of the watched directory
+ if !dataSink.IsIncremental() {
+ key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+ }
+ } else {
+ // old key is outside of the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is in the watched directory
+ key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ } else {
+ // new key is also outside of the watched directory
+ // skip
+ }
+ }
+
+ return nil
+ }
+ return processEventFn
+}
+
+func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
+ if !dataSink.IsIncremental() {
+ return util.Join(targetPath, string(sourceKey)[len(sourcePath):])
+ }
+ var mTime int64
+ if message.NewEntry != nil {
+ mTime = message.NewEntry.Attributes.Mtime
+ } else if message.OldEntry != nil {
+ mTime = message.OldEntry.Attributes.Mtime
+ }
+ dateKey := time.Unix(mTime, 0).Format("2006-01-02")
+ return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
+}
diff --git a/weed/command/master.go b/weed/command/master.go
index d569919cd..0f5e2156d 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -6,7 +6,6 @@ import (
"google.golang.org/grpc/reflection"
"net/http"
"os"
- "runtime"
"sort"
"strconv"
"strings"
@@ -48,8 +47,8 @@ type MasterOptions struct {
func init() {
cmdMaster.Run = runMaster // break init cycle
m.port = cmdMaster.Flag.Int("port", 9333, "http listen port")
- m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address")
- m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier")
+ m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to")
m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
@@ -86,7 +85,6 @@ func runMaster(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadConfiguration("master", false)
- runtime.GOMAXPROCS(runtime.NumCPU())
grace.SetupProfiling(*masterCpuProfile, *masterMemProfile)
parent, _ := util.FullPath(*m.metaFolder).DirAndName()
@@ -138,7 +136,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
- // Create your protocol servers.
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
diff --git a/weed/command/mount.go b/weed/command/mount.go
index ea439af7c..5811f0b99 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -25,6 +25,7 @@ type MountOptions struct {
volumeServerAccess *string
uidMap *string
gidMap *string
+ readOnly *bool
}
var (
@@ -45,7 +46,7 @@ func init() {
mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
- mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")
+ mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
@@ -55,6 +56,7 @@ func init() {
mountOptions.volumeServerAccess = cmdMount.Flag.String("volumeServerAccess", "direct", "access volume servers by [direct|publicUrl|filerProxy]")
mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>")
mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>")
+ mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index a6d562d40..8e5b7a483 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -53,7 +53,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
filer := *option.filer
// parse filer grpc address
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer)
if err != nil {
glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
return true
@@ -63,16 +63,23 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ for i := 0; i < 10; i++ {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
if err != nil {
- return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
+ glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ glog.V(0).Infof("wait for %d seconds ...", i+1)
+ time.Sleep(time.Duration(i+1) * time.Second)
}
- cipher = resp.Cipher
- return nil
- })
+ }
if err != nil {
- glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err)
return true
}
@@ -196,6 +203,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
+ ReadOnly: *option.readOnly,
})
// mount
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index b4b5855ff..db0b4148d 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -63,7 +63,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer)
if err != nil {
glog.Fatal(err)
return false
diff --git a/weed/command/s3.go b/weed/command/s3.go
index d8e3e306b..c8292a7d5 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -137,7 +137,7 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 993391a42..4794da18e 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -103,9 +103,9 @@ dir = "./filerrdb" # directory to store rocksdb files
[mysql] # or memsql, tidb
# CREATE TABLE IF NOT EXISTS filemeta (
-# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
-# name VARCHAR(1000) COMMENT 'directory or file name',
-# directory TEXT COMMENT 'full path to parent directory',
+# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
+# name VARCHAR(1000) BINARY COMMENT 'directory or file name',
+# directory TEXT COMMENT 'full path to parent directory',
# meta LONGBLOB,
# PRIMARY KEY (dirhash, name)
# ) DEFAULT CHARSET=utf8;
@@ -120,13 +120,16 @@ connection_max_idle = 2
connection_max_open = 100
connection_max_lifetime_seconds = 0
interpolateParams = false
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
[mysql2] # or memsql, tidb
enabled = false
createTable = """
CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
dirhash BIGINT,
- name VARCHAR(1000),
+ name VARCHAR(1000) BINARY,
directory TEXT,
meta LONGBLOB,
PRIMARY KEY (dirhash, name)
@@ -141,6 +144,9 @@ connection_max_idle = 2
connection_max_open = 100
connection_max_lifetime_seconds = 0
interpolateParams = false
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
[postgres] # or cockroachdb, YugabyteDB
# CREATE TABLE IF NOT EXISTS filemeta (
@@ -161,6 +167,9 @@ sslmode = "disable"
connection_max_idle = 100
connection_max_open = 100
connection_max_lifetime_seconds = 0
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
[postgres2]
enabled = false
@@ -183,6 +192,9 @@ sslmode = "disable"
connection_max_idle = 100
connection_max_open = 100
connection_max_lifetime_seconds = 0
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
[cassandra]
# CREATE TABLE filemeta (
@@ -356,6 +368,9 @@ directory = "/buckets"
[sink.local]
enabled = false
directory = "/data"
+# all replicated files are under modified time as yyyy-mm-dd directories
+# so each date directory contains all new and updated files.
+is_incremental = false
[sink.local_incremental]
# all replicated files are under modified time as yyyy-mm-dd directories
@@ -373,6 +388,7 @@ directory = "/backup"
replication = ""
collection = ""
ttlSec = 0
+is_incremental = false
[sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
@@ -384,6 +400,7 @@ region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory
endpoint = ""
+is_incremental = false
[sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@@ -391,6 +408,7 @@ enabled = false
google_application_credentials = "/path/to/x.json" # path to json credential file
bucket = "your_bucket_seaweedfs" # an existing bucket
directory = "/" # destination directory
+is_incremental = false
[sink.azure]
# experimental, let me know if it works
@@ -399,6 +417,7 @@ account_name = ""
account_key = ""
container = "mycontainer" # an existing container
directory = "/" # destination directory
+is_incremental = false
[sink.backblaze]
enabled = false
@@ -406,6 +425,7 @@ b2_account_id = ""
b2_master_application_key = ""
bucket = "mybucket" # an existing bucket
directory = "/" # destination directory
+is_incremental = false
`
@@ -432,22 +452,28 @@ expires_after_seconds = 10 # seconds
# the host name is not checked, so the PERM files can be shared.
[grpc]
ca = ""
+# Set wildcard domain for enable TLS authentication by common names
+allowed_wildcard_domain = "" # .mycompany.com
[grpc.volume]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.master]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.filer]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.msg_broker]
cert = ""
key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
# use this for any place needs a grpc client
# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
@@ -455,7 +481,6 @@ key = ""
cert = ""
key = ""
-
# volume server https options
# Note: work in progress!
# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
@@ -493,7 +518,7 @@ default = "localhost:8888" # used by maintenance scripts if the scripts needs
[master.sequencer]
-type = "raft" # Choose [raft|etcd] type for storing the file id sequence
+type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence
# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
# example : http://127.0.0.1:2379,http://127.0.0.1:2389
sequencer_etcd_urls = "http://127.0.0.1:2379"
diff --git a/weed/command/server.go b/weed/command/server.go
index 611578953..6eb3bf97c 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,9 +2,8 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
"os"
- "runtime"
- "runtime/pprof"
"strings"
"time"
@@ -16,6 +15,7 @@ import (
type ServerOptions struct {
cpuprofile *string
+ memprofile *string
v VolumeServerOptions
}
@@ -49,8 +49,8 @@ var cmdServer = &Command{
}
var (
- serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name")
- serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
+ serverBindIp = cmdServer.Flag.String("ip.bind", "", "ip address to bind to")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
@@ -76,6 +76,7 @@ var (
func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
+ serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file")
masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
@@ -93,11 +94,12 @@ func init() {
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
- filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
+ filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 4, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@@ -107,10 +109,12 @@ func init() {
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files")
+ serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
@@ -137,14 +141,7 @@ func runServer(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadConfiguration("master", false)
- if *serverOptions.cpuprofile != "" {
- f, err := os.Create(*serverOptions.cpuprofile)
- if err != nil {
- glog.Fatal(err)
- }
- pprof.StartCPUProfile(f)
- defer pprof.StopCPUProfile()
- }
+ grace.SetupProfiling(*serverOptions.cpuprofile, *serverOptions.memprofile)
if *isStartingS3 {
*isStartingFiler = true
@@ -156,19 +153,21 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true
}
- _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
- peers := strings.Join(peerList, ",")
- masterOptions.peers = &peers
+ if *isStartingMasterServer {
+ _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
+ peers := strings.Join(peerList, ",")
+ masterOptions.peers = &peers
+ }
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = &peers
+ filerOptions.masters = masterOptions.peers
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
- serverOptions.v.masters = &peers
+ serverOptions.v.masters = masterOptions.peers
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
@@ -189,7 +188,6 @@ func runServer(cmd *Command, args []string) bool {
webdavOptions.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
- runtime.GOMAXPROCS(runtime.NumCPU())
go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
folders := strings.Split(*volumeDataFolders, ",")
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 67fde2185..0f9361b40 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -43,7 +43,7 @@ func init() {
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
+ upload.maxMB = cmdUpload.Flag.Int("maxMB", 4, "split files larger than the limit")
upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 659c93d96..9df500178 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -6,7 +6,6 @@ import (
"net/http"
httppprof "net/http/pprof"
"os"
- "runtime"
"runtime/pprof"
"strconv"
"strings"
@@ -36,41 +35,43 @@ var (
)
type VolumeServerOptions struct {
- port *int
- publicPort *int
- folders []string
- folderMaxLimits []int
- idxFolder *string
- ip *string
- publicUrl *string
- bindIp *string
- masters *string
- idleConnectionTimeout *int
- dataCenter *string
- rack *string
- whiteList []string
- indexType *string
- diskType *string
- fixJpgOrientation *bool
- readRedirect *bool
- cpuProfile *string
- memProfile *string
- compactionMBPerSecond *int
- fileSizeLimitMB *int
- minFreeSpacePercents []float32
- pprof *bool
- preStopSeconds *int
- metricsHttpPort *int
+ port *int
+ publicPort *int
+ folders []string
+ folderMaxLimits []int
+ idxFolder *string
+ ip *string
+ publicUrl *string
+ bindIp *string
+ masters *string
+ idleConnectionTimeout *int
+ dataCenter *string
+ rack *string
+ whiteList []string
+ indexType *string
+ diskType *string
+ fixJpgOrientation *bool
+ readRedirect *bool
+ cpuProfile *string
+ memProfile *string
+ compactionMBPerSecond *int
+ fileSizeLimitMB *int
+ concurrentUploadLimitMB *int
+ minFreeSpacePercents []float32
+ pprof *bool
+ preStopSeconds *int
+ metricsHttpPort *int
// pulseSeconds *int
+ enableTcp *bool
}
func init() {
cmdVolume.Run = runVolume // break init cycle
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
- v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name")
+ v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
- v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to")
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
@@ -85,9 +86,11 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
+ v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port")
}
var cmdVolume = &Command{
@@ -109,8 +112,6 @@ func runVolume(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- runtime.GOMAXPROCS(runtime.NumCPU())
-
// If --pprof is set we assume the caller wants to be able to collect
// cpu and memory profiles via go tool pprof
if !*v.pprof {
@@ -238,6 +239,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.fixJpgOrientation, *v.readRedirect,
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
+ int64(*v.concurrentUploadLimitMB)*1024*1024,
)
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
@@ -251,6 +253,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
+ // starting tcp server
+ if *v.enableTcp {
+ go v.startTcpService(volumeServer)
+ }
+
// starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux)
@@ -368,3 +375,22 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}()
return clusterHttpServer
}
+
+func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
+ listener, e := util.NewListener(listeningAddress, 0)
+ if e != nil {
+ glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
+ }
+ defer listener.Close()
+
+ for {
+ c, err := listener.Accept()
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ go volumeServer.HandleTcpConnection(c)
+ }
+}
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 2bd4a3c61..781ea1e36 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -78,7 +78,7 @@ func (wo *WebDavOption) startWebDav() bool {
}
// parse filer grpc address
- filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
+ filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer)
if err != nil {
glog.Fatal(err)
return false