aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/filer.go1
-rw-r--r--weed/command/filer_backup.go51
-rw-r--r--weed/command/filer_meta_backup.go48
-rw-r--r--weed/command/filer_meta_tail.go36
-rw-r--r--weed/command/filer_remote_sync.go260
-rw-r--r--weed/command/filer_replication.go6
-rw-r--r--weed/command/filer_sync.go49
-rw-r--r--weed/command/imports.go31
-rw-r--r--weed/command/shell.go1
10 files changed, 320 insertions, 164 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 9ae93fe61..02de2bd35 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -21,6 +21,7 @@ var Commands = []*Command{
cmdFilerCopy,
cmdFilerMetaBackup,
cmdFilerMetaTail,
+ cmdFilerRemoteSynchronize,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 4fd2f9c72..ddee0852c 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -3,7 +3,6 @@ package command
import (
"fmt"
"net/http"
- _ "net/http/pprof"
"os"
"strconv"
"strings"
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index fc4dd8298..2828ccb39 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,16 +1,13 @@
package command
import (
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
- "io"
"time"
)
@@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "backup_" + dataSink.GetName(),
- PathPrefix: sourcePath,
- SinceNs: startFrom.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
-
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("processEventFn: %v", err)
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
- return fmt.Errorf("setOffset: %v", err)
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_" + dataSink.GetName(),
+ sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 28bd367e7..108e76566 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"google.golang.org/grpc"
- "io"
"reflect"
"time"
@@ -190,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return nil
}
- tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "meta_backup",
- PathPrefix: *metaBackup.filerDirectory,
- SinceNs: startTime.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err = eachEntryFunc(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
- return err2
- }
- }
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
+ return metaBackup.setOffset(lastTime)
+ })
- }
+ return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
+ *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
- })
- return tailErr
}
func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 76699bb5e..28c0db99b 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -3,16 +3,15 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
- "io"
"os"
"path/filepath"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -104,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *tailTarget,
- SinceNs: time.Now().Add(-*tailStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
+ *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
+ func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
- continue
+ return nil
}
- if err = eachEntryFunc(resp); err != nil {
+ if err := eachEntryFunc(resp); err != nil {
return err
}
- }
+ return nil
+ }, false)
- })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
new file mode 100644
index 000000000..4afb7c091
--- /dev/null
+++ b/weed/command/filer_remote_sync.go
@@ -0,0 +1,260 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+ "strings"
+ "time"
+)
+
+type RemoteSyncOptions struct {
+ filerAddress *string
+ grpcDialOption grpc.DialOption
+ readChunkFromFiler *bool
+ debug *bool
+ timeAgo *time.Duration
+ dir *string
+}
+
+const (
+ RemoteSyncKeyPrefix = "remote.sync."
+)
+
+var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
+
+func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return fn(client)
+ })
+}
+func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
+
+var (
+ remoteSyncOptions RemoteSyncOptions
+)
+
+func init() {
+ cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
+ remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
+ remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
+ remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
+ remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
+ remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+}
+
+var cmdFilerRemoteSynchronize = &Command{
+ UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
+ Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage",
+ Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage
+
+ filer.remote.sync listens on filer update events.
+ If any mounted remote file is updated, it will fetch the updated content,
+ and write to the remote storage.
+`,
+}
+
+func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ remoteSyncOptions.grpcDialOption = grpcDialOption
+
+ // read filer remote storage mount mappings
+ mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
+ if readErr != nil {
+ fmt.Printf("read mount mapping: %v", readErr)
+ return false
+ }
+
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(
+ *remoteSyncOptions.filerAddress,
+ pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
+ "/", // does not matter
+ *remoteSyncOptions.readChunkFromFiler,
+ )
+
+ var found bool
+ for dir, remoteStorageMountLocation := range mappings.Mappings {
+ if *remoteSyncOptions.dir == dir {
+ found = true
+ storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
+ if readErr != nil {
+ fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
+ continue
+ }
+ fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
+ if err := util.Retry("filer.remote.sync "+dir, func() error {
+ return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
+ }); err != nil {
+ fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
+ }
+ break
+ }
+ }
+ if !found {
+ fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
+ return false
+ }
+
+ return true
+}
+
+func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
+
+ dirHash := util.HashStringToLong(mountedDir)
+
+ // 1. specified by timeAgo
+ // 2. last offset timestamp for this directory
+ // 3. directory creation time
+ var lastOffsetTs time.Time
+ if *option.timeAgo == 0 {
+ mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ if err != nil {
+ return fmt.Errorf("lookup %s: %v", mountedDir, err)
+ }
+
+ lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
+ if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
+ lastOffsetTs = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resume from %v", lastOffsetTs)
+ } else {
+ lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ }
+ } else {
+ lastOffsetTs = time.Now().Add(-*option.timeAgo)
+ }
+
+ client, err := remote_storage.GetRemoteStorage(remoteStorage)
+ if err != nil {
+ return err
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if len(message.NewEntry.Chunks) == 0 {
+ return nil
+ }
+ fmt.Printf("create: %+v\n", resp)
+ if !shouldSendToRemote(message.NewEntry) {
+ fmt.Printf("skipping creating: %+v\n", resp)
+ return nil
+ }
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ fmt.Printf("delete: %+v\n", resp)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ return client.DeleteFile(dest)
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if !shouldSendToRemote(message.NewEntry) {
+ fmt.Printf("skipping updating: %+v\n", resp)
+ return nil
+ }
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
+ fmt.Printf("update meta: %+v\n", resp)
+ return client.UpdateFileMetadata(dest, message.NewEntry)
+ }
+ }
+ fmt.Printf("update: %+v\n", resp)
+ if err := client.DeleteFile(oldDest); err != nil {
+ return err
+ }
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+
+ return nil
+ }
+
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
+ return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
+ })
+
+ return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
+ "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+}
+
+func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
+ var dest string
+ source := string(sourcePath[len(mountDir):])
+ if strings.HasSuffix(remoteMountLocation.Path, "/") {
+ dest = remoteMountLocation.Path + source[1:]
+ } else {
+ dest = remoteMountLocation.Path + source
+ }
+ return &filer_pb.RemoteStorageLocation{
+ Name: remoteMountLocation.Name,
+ Bucket: remoteMountLocation.Bucket,
+ Path: dest,
+ }
+}
+
+func isSameChunks(a, b []*filer_pb.FileChunk) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i := 0; i < len(a); i++ {
+ x, y := a[i], b[i]
+ if !proto.Equal(x, y) {
+ return false
+ }
+ }
+ return true
+}
+
+func shouldSendToRemote(entry *filer_pb.Entry) bool {
+ if entry.RemoteEntry == nil {
+ return true
+ }
+ if entry.RemoteEntry.Size != int64(filer.FileSize(entry)) {
+ return true
+ }
+ if entry.RemoteEntry.LastModifiedAt < entry.Attributes.Mtime {
+ return true
+ }
+ return false
+}
+
+func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
+ entry.RemoteEntry = remoteEntry
+ return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ return err
+ })
+} \ No newline at end of file
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 885c95540..bf0a3e140 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -7,12 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
)
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 7cfc8a7fe..a20f17201 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -15,7 +15,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
- "io"
"strings"
"time"
)
@@ -166,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "syncTo_" + targetFiler,
- PathPrefix: sourcePath,
- SinceNs: sourceFilerOffsetTsNs,
- Signature: targetFilerSignature,
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
- return err
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_" + targetFiler,
+ sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+
}
const (
diff --git a/weed/command/imports.go b/weed/command/imports.go
new file mode 100644
index 000000000..d7ade1379
--- /dev/null
+++ b/weed/command/imports.go
@@ -0,0 +1,31 @@
+package command
+
+import (
+ _ "net/http/pprof"
+
+ _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
+
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+
+) \ No newline at end of file
diff --git a/weed/command/shell.go b/weed/command/shell.go
index c9976e809..4a9f4b027 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -55,6 +55,7 @@ func runShell(command *Command, args []string) bool {
var err error
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
+ shellOptions.FilerAddress = *shellInitialFiler
if err != nil {
fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err)
return false