aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-07-27 19:22:57 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-07-27 19:22:57 +0500
commit7e09a548a64a6ffd62f40e8b38a96d6b20f9828b (patch)
tree490f9a56b8308a3b116a220128c887c210ab3113
parentf401b996eb421294920fe048df5b1b646a558324 (diff)
downloadseaweedfs-7e09a548a64a6ffd62f40e8b38a96d6b20f9828b.tar.xz
seaweedfs-7e09a548a64a6ffd62f40e8b38a96d6b20f9828b.zip
exclude directories to sync on filer
-rw-r--r--Makefile2
-rw-r--r--weed/command/filer_backup.go12
-rw-r--r--weed/command/filer_sync.go58
-rw-r--r--weed/command/scaffold/replication.toml2
-rw-r--r--weed/replication/replicator.go17
5 files changed, 74 insertions, 17 deletions
diff --git a/Makefile b/Makefile
index aa736edee..315a62428 100644
--- a/Makefile
+++ b/Makefile
@@ -10,5 +10,5 @@ install:
full_install:
cd weed; go install -tags "elastic gocdk sqlite ydb tikv"
-test:
+tests:
cd weed; go test -tags "elastic gocdk sqlite ydb tikv" -v ./...
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 90bc8c5c3..62477227b 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
+ "strings"
"time"
)
@@ -15,6 +16,7 @@ type FilerBackupOptions struct {
isActivePassive *bool
filer *string
path *string
+ excludePaths *string
debug *bool
proxyByFiler *bool
timeAgo *time.Duration
@@ -28,6 +30,7 @@ func init() {
cmdFilerBackup.Run = runFilerBackup // break init cycle
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
+ filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
@@ -84,6 +87,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
sourceFiler := pb.ServerAddress(*backupOption.filer)
sourcePath := *backupOption.path
+ excludePaths := strings.Split(*backupOption.excludePaths, ",")
timeAgo := *backupOption.timeAgo
targetPath := dataSink.GetSinkToDirectory()
debug := *backupOption.debug
@@ -106,10 +110,14 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
// create filer sink
filerSource := &source.FilerSource{}
- filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler)
+ filerSource.DoInitialize(
+ sourceFiler.ToHttpAddress(),
+ sourceFiler.ToGrpcAddress(),
+ sourcePath,
+ *backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource)
- processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
+ processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, dataSink, debug)
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 130138a3d..abf13a81d 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -26,7 +26,9 @@ type SyncOptions struct {
filerA *string
filerB *string
aPath *string
+ aExcludePaths *string
bPath *string
+ bExcludePaths *string
aReplication *string
bReplication *string
aCollection *string
@@ -58,7 +60,9 @@ func init() {
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
+ syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
+ syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
@@ -133,9 +137,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
for {
syncOptions.clientEpoch++
- err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
- *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
- *syncOptions.bDebug, aFilerSignature, bFilerSignature)
+ err := doSubscribeFilerMetaChanges(
+ syncOptions.clientId,
+ syncOptions.clientEpoch,
+ grpcDialOption,
+ filerA,
+ *syncOptions.aPath,
+ strings.Split(*syncOptions.aExcludePaths, ","),
+ *syncOptions.aProxyByFiler,
+ filerB,
+ *syncOptions.bPath,
+ *syncOptions.bReplication,
+ *syncOptions.bCollection,
+ *syncOptions.bTtlSec,
+ *syncOptions.bProxyByFiler,
+ *syncOptions.bDiskType,
+ *syncOptions.bDebug,
+ aFilerSignature,
+ bFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@@ -154,9 +173,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
for {
syncOptions.clientEpoch++
- err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
- *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
- *syncOptions.aDebug, bFilerSignature, aFilerSignature)
+ err := doSubscribeFilerMetaChanges(
+ syncOptions.clientId,
+ syncOptions.clientEpoch,
+ grpcDialOption,
+ filerB,
+ *syncOptions.bPath,
+ strings.Split(*syncOptions.bExcludePaths, ","),
+ *syncOptions.bProxyByFiler,
+ filerA,
+ *syncOptions.aPath,
+ *syncOptions.aReplication,
+ *syncOptions.aCollection,
+ *syncOptions.aTtlSec,
+ *syncOptions.aProxyByFiler,
+ *syncOptions.aDiskType,
+ *syncOptions.aDebug,
+ bFilerSignature,
+ aFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@@ -186,7 +220,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
return nil
}
-func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
+func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
@@ -205,7 +239,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
- persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
+ persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
@@ -302,7 +336,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
}
-func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
+func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
// process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
@@ -322,7 +356,11 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
if !strings.HasPrefix(resp.Directory, sourcePath) {
return nil
}
-
+ for _, excludePath := range excludePaths {
+ if strings.HasPrefix(resp.Directory, excludePath) {
+ return nil
+ }
+ }
// handle deletions
if filer_pb.IsDelete(resp) {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
diff --git a/weed/command/scaffold/replication.toml b/weed/command/scaffold/replication.toml
index c463c8077..cffe1b76f 100644
--- a/weed/command/scaffold/replication.toml
+++ b/weed/command/scaffold/replication.toml
@@ -13,6 +13,8 @@ grpcAddress = "localhost:18888"
# this is not a directory on your hard drive, but on your filer.
# i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets"
+# files from the directory separated by space are excluded from sending notifications
+excludeDirectories = "/buckets/tmp"
[sink.local]
enabled = false
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index eaab2c13e..8fee23b95 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -16,8 +16,9 @@ import (
)
type Replicator struct {
- sink sink.ReplicationSink
- source *source.FilerSource
+ sink sink.ReplicationSink
+ source *source.FilerSource
+ excludeDirs []string
}
func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
@@ -28,8 +29,9 @@ func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSin
dataSink.SetSourceFiler(source)
return &Replicator{
- sink: dataSink,
- source: source,
+ sink: dataSink,
+ source: source,
+ excludeDirs: sourceConfig.GetStringSlice(configPrefix + "excludeDirectories"),
}
}
@@ -41,6 +43,13 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil
}
+ for _, excludeDir := range r.excludeDirs {
+ if strings.HasPrefix(key, excludeDir) {
+ glog.V(4).Infof("skipping %v of exclude dir %v", key, excludeDir)
+ return nil
+ }
+ }
+
var dateKey string
if r.sink.IsIncremental() {
var mTime int64