diff options
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 10 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 10 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 19 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 8 | ||||
| -rw-r--r-- | weed/replication/sink/replication_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 18 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 15 | ||||
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 14 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 4 | ||||
| -rw-r--r-- | weed/replication/sub/notification_google_pub_sub.go | 12 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 14 | ||||
| -rw-r--r-- | weed/replication/sub/notifications.go | 2 |
13 files changed, 67 insertions, 65 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 7353cdc91..a0ef6591c 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -18,10 +18,10 @@ type Replicator struct { source *source.FilerSource } -func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator { +func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator { source := &source.FilerSource{} - source.Initialize(sourceConfig) + source.Initialize(sourceConfig, configPrefix) dataSink.SetSourceFiler(source) diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 6381908a1..a0b1a41ab 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string { return g.dir } -func (g *AzureSink) Initialize(configuration util.Configuration) error { +func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("account_name"), - configuration.GetString("account_key"), - configuration.GetString("container"), - configuration.GetString("directory"), + configuration.GetString(prefix+"account_name"), + configuration.GetString(prefix+"account_key"), + configuration.GetString(prefix+"container"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 35c2230fa..8c80a64bd 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string { return g.dir } -func (g *B2Sink) Initialize(configuration util.Configuration) error { +func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("b2_account_id"), - configuration.GetString("b2_master_application_key"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"b2_account_id"), + configuration.GetString(prefix+"b2_master_application_key"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 4790d1562..de99fbe1c 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -3,10 +3,11 @@ package filersink import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -38,13 +39,13 @@ func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } -func (fs *FilerSink) Initialize(configuration util.Configuration) error { +func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), - configuration.GetString("replication"), - configuration.GetString("collection"), - configuration.GetInt("ttlSec"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), + configuration.GetString(prefix+"replication"), + configuration.GetString(prefix+"collection"), + configuration.GetInt(prefix+"ttlSec"), ) } @@ -59,7 +60,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, fs.replication = replication fs.collection = collection fs.ttlSec = int32(ttlSec) - fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index abd7c49b9..5aa978ab8 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -34,11 +34,11 @@ func (g *GcsSink) GetSinkToDirectory() string { return g.dir } -func (g *GcsSink) Initialize(configuration util.Configuration) error { +func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index dd54f0005..208bbdf87 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -9,7 +9,7 @@ import ( type ReplicationSink interface { GetName() string - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 4cff341d0..e4e097c0f 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -39,16 +39,16 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { return s3sink.dir } -func (s3sink *S3Sink) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory")) +func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) + glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) return s3sink.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index aef13be75..c3ea44671 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -3,13 +3,14 @@ package source import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "google.golang.org/grpc" "io" "net/http" "strings" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -25,17 +26,17 @@ type FilerSource struct { Dir string } -func (fs *FilerSource) Initialize(configuration util.Configuration) error { +func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), ) } func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.Dir = dir - fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index bed26c79c..06869e619 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -27,14 +27,14 @@ func (k *AwsSqsInput) GetName() string { return "aws_sqs" } -func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index eddba9ff8..9726096e5 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -27,8 +27,8 @@ func (k *GoCDKPubSubInput) GetName() string { return "gocdk_pub_sub" } -func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error { - subURL := config.GetString("sub_url") +func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { + subURL := configuration.GetString(prefix + "sub_url") glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) sub, err := pubsub.OpenSubscription(context.Background(), subURL) if err != nil { diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index ad6b42a2e..a950bb42b 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -27,13 +27,13 @@ func (k *GooglePubSubInput) GetName() string { return "google_pub_sub" } -func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index 1a86a8307..fa9cfad9b 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -28,14 +28,14 @@ func (k *KafkaInput) GetName() string { return "kafka" } -func (k *KafkaInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), - configuration.GetString("offsetFile"), - configuration.GetInt("offsetSaveIntervalSeconds"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), + configuration.GetString(prefix+"offsetFile"), + configuration.GetInt(prefix+"offsetSaveIntervalSeconds"), ) } diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go index 66fbef824..8a2668f98 100644 --- a/weed/replication/sub/notifications.go +++ b/weed/replication/sub/notifications.go @@ -9,7 +9,7 @@ type NotificationInput interface { // GetName gets the name to locate the configuration in sync.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) } |
