aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-09-23 00:40:36 -0700
committerChris Lu <chris.lu@gmail.com>2018-09-23 00:40:36 -0700
commit9fe24991d5b5be0cd3f56cbb65883c67c20fdfe6 (patch)
treebe465d1ceae0b8968986b09c13cf2ac2dcca5873
parent7d6b2a4740c32c3a07a4ce4204da2debee371bcd (diff)
downloadseaweedfs-9fe24991d5b5be0cd3f56cbb65883c67c20fdfe6.tar.xz
seaweedfs-9fe24991d5b5be0cd3f56cbb65883c67c20fdfe6.zip
refactoring
-rw-r--r--weed/command/filer_replication.go13
-rw-r--r--weed/replication/replicator.go12
-rw-r--r--weed/replication/sink/filersink/fetch_write.go (renamed from weed/replication/sink/fetch_write.go)2
-rw-r--r--weed/replication/sink/filersink/filer_sink.go (renamed from weed/replication/sink/filer_sink.go)10
-rw-r--r--weed/replication/sink/replication_sink.go14
5 files changed, 31 insertions, 20 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 9f8f4442a..b19597245 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/spf13/viper"
+ "strings"
)
func init() {
@@ -44,6 +45,18 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
}
+ // avoid recursive replication
+ if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
+ sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
+ if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
+ fromDir := sourceConfig.GetString("directory")
+ toDir := sinkConfig.GetString("directory")
+ if strings.HasPrefix(toDir, fromDir) {
+ glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
+ }
+ }
+ }
+
replicator := replication.NewReplicator(config.Sub("source.filer"), config.Sub("sink.filer"))
for {
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 3e4bccc10..5884bd35b 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -3,9 +3,9 @@ package replication
import (
"strings"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -17,20 +17,12 @@ type Replicator struct {
func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator {
- sink := &sink.FilerSink{}
+ sink := &filersink.FilerSink{}
sink.Initialize(sinkConfig)
source := &source.FilerSource{}
source.Initialize(sourceConfig)
- if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
- fromDir := sourceConfig.GetString("directory")
- toDir := sinkConfig.GetString("directory")
- if strings.HasPrefix(toDir, fromDir) {
- glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
- }
- }
-
sink.SetSourceFiler(source)
return &Replicator{
diff --git a/weed/replication/sink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index ef7c201c9..c14566723 100644
--- a/weed/replication/sink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -1,4 +1,4 @@
-package sink
+package filersink
import (
"context"
diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index f0a7e68d3..1cbf52864 100644
--- a/weed/replication/sink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -1,4 +1,4 @@
-package sink
+package filersink
import (
"context"
@@ -11,14 +11,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-type ReplicationSink interface {
- DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error
- CreateEntry(key string, entry *filer_pb.Entry) error
- UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error
- GetSinkToDirectory() string
- SetSourceFiler(s *source.FilerSource)
-}
-
type FilerSink struct {
filerSource *source.FilerSource
grpcAddress string
diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go
new file mode 100644
index 000000000..bb4a8aa83
--- /dev/null
+++ b/weed/replication/sink/replication_sink.go
@@ -0,0 +1,14 @@
+package sink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+)
+
+type ReplicationSink interface {
+ DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error
+ CreateEntry(key string, entry *filer_pb.Entry) error
+ UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error
+ GetSinkToDirectory() string
+ SetSourceFiler(s *source.FilerSource)
+}