aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/replicator.go4
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go10
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go10
-rw-r--r--weed/replication/sink/filersink/filer_sink.go19
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go8
-rw-r--r--weed/replication/sink/replication_sink.go2
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go18
-rw-r--r--weed/replication/source/filer_source.go15
-rw-r--r--weed/replication/sub/notification_aws_sqs.go14
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go4
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go12
-rw-r--r--weed/replication/sub/notification_kafka.go14
-rw-r--r--weed/replication/sub/notifications.go2
13 files changed, 67 insertions, 65 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 7353cdc91..a0ef6591c 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -18,10 +18,10 @@ type Replicator struct {
source *source.FilerSource
}
-func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator {
+func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
source := &source.FilerSource{}
- source.Initialize(sourceConfig)
+ source.Initialize(sourceConfig, configPrefix)
dataSink.SetSourceFiler(source)
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index 6381908a1..a0b1a41ab 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
-func (g *AzureSink) Initialize(configuration util.Configuration) error {
+func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("account_name"),
- configuration.GetString("account_key"),
- configuration.GetString("container"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"account_name"),
+ configuration.GetString(prefix+"account_key"),
+ configuration.GetString(prefix+"container"),
+ configuration.GetString(prefix+"directory"),
)
}
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index 35c2230fa..8c80a64bd 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir
}
-func (g *B2Sink) Initialize(configuration util.Configuration) error {
+func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("b2_account_id"),
- configuration.GetString("b2_master_application_key"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"b2_account_id"),
+ configuration.GetString(prefix+"b2_master_application_key"),
+ configuration.GetString(prefix+"bucket"),
+ configuration.GetString(prefix+"directory"),
)
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 4790d1562..de99fbe1c 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -3,10 +3,11 @@ package filersink
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
+
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/security"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -38,13 +39,13 @@ func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir
}
-func (fs *FilerSink) Initialize(configuration util.Configuration) error {
+func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
return fs.initialize(
- configuration.GetString("grpcAddress"),
- configuration.GetString("directory"),
- configuration.GetString("replication"),
- configuration.GetString("collection"),
- configuration.GetInt("ttlSec"),
+ configuration.GetString(prefix+"grpcAddress"),
+ configuration.GetString(prefix+"directory"),
+ configuration.GetString(prefix+"replication"),
+ configuration.GetString(prefix+"collection"),
+ configuration.GetInt(prefix+"ttlSec"),
)
}
@@ -59,7 +60,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
- fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
return nil
}
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index abd7c49b9..5aa978ab8 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -34,11 +34,11 @@ func (g *GcsSink) GetSinkToDirectory() string {
return g.dir
}
-func (g *GcsSink) Initialize(configuration util.Configuration) error {
+func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("google_application_credentials"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"google_application_credentials"),
+ configuration.GetString(prefix+"bucket"),
+ configuration.GetString(prefix+"directory"),
)
}
diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go
index dd54f0005..208bbdf87 100644
--- a/weed/replication/sink/replication_sink.go
+++ b/weed/replication/sink/replication_sink.go
@@ -9,7 +9,7 @@ import (
type ReplicationSink interface {
GetName() string
- Initialize(configuration util.Configuration) error
+ Initialize(configuration util.Configuration, prefix string) error
DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error
CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error
UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error)
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 4cff341d0..e4e097c0f 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -39,16 +39,16 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
return s3sink.dir
}
-func (s3sink *S3Sink) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory"))
+func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
+ glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
+ glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
return s3sink.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"aws_access_key_id"),
+ configuration.GetString(prefix+"aws_secret_access_key"),
+ configuration.GetString(prefix+"region"),
+ configuration.GetString(prefix+"bucket"),
+ configuration.GetString(prefix+"directory"),
)
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index aef13be75..c3ea44671 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -3,13 +3,14 @@ package source
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
"io"
"net/http"
"strings"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -25,17 +26,17 @@ type FilerSource struct {
Dir string
}
-func (fs *FilerSource) Initialize(configuration util.Configuration) error {
+func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
return fs.initialize(
- configuration.GetString("grpcAddress"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"grpcAddress"),
+ configuration.GetString(prefix+"directory"),
)
}
func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
fs.grpcAddress = grpcAddress
fs.Dir = dir
- fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
return nil
}
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index bed26c79c..06869e619 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -27,14 +27,14 @@ func (k *AwsSqsInput) GetName() string {
return "aws_sqs"
}
-func (k *AwsSqsInput) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
+ glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("sqs_queue_name"),
+ configuration.GetString(prefix+"aws_access_key_id"),
+ configuration.GetString(prefix+"aws_secret_access_key"),
+ configuration.GetString(prefix+"region"),
+ configuration.GetString(prefix+"sqs_queue_name"),
)
}
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index eddba9ff8..9726096e5 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -27,8 +27,8 @@ func (k *GoCDKPubSubInput) GetName() string {
return "gocdk_pub_sub"
}
-func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error {
- subURL := config.GetString("sub_url")
+func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
+ subURL := configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
if err != nil {
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index ad6b42a2e..a950bb42b 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -27,13 +27,13 @@ func (k *GooglePubSubInput) GetName() string {
return "google_pub_sub"
}
-func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id"))
- glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic"))
+func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
+ glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetString("google_application_credentials"),
- configuration.GetString("project_id"),
- configuration.GetString("topic"),
+ configuration.GetString(prefix+"google_application_credentials"),
+ configuration.GetString(prefix+"project_id"),
+ configuration.GetString(prefix+"topic"),
)
}
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index 1a86a8307..fa9cfad9b 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -28,14 +28,14 @@ func (k *KafkaInput) GetName() string {
return "kafka"
}
-func (k *KafkaInput) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
- glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic"))
+func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetStringSlice("hosts"),
- configuration.GetString("topic"),
- configuration.GetString("offsetFile"),
- configuration.GetInt("offsetSaveIntervalSeconds"),
+ configuration.GetStringSlice(prefix+"hosts"),
+ configuration.GetString(prefix+"topic"),
+ configuration.GetString(prefix+"offsetFile"),
+ configuration.GetInt(prefix+"offsetSaveIntervalSeconds"),
)
}
diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go
index 66fbef824..8a2668f98 100644
--- a/weed/replication/sub/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -9,7 +9,7 @@ type NotificationInput interface {
// GetName gets the name to locate the configuration in sync.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration util.Configuration) error
+ Initialize(configuration util.Configuration, prefix string) error
ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
}