aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-10 09:44:32 +0800
committerGitHub <noreply@github.com>2020-07-10 09:44:32 +0800
commite67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch)
tree4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/replication
parent2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff)
parent1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff)
downloadseaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz
seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/replicator.go21
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go26
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go31
-rw-r--r--weed/replication/sink/filersink/README.txt12
-rw-r--r--weed/replication/sink/filersink/fetch_write.go73
-rw-r--r--weed/replication/sink/filersink/filer_sink.go116
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go24
-rw-r--r--weed/replication/sink/replication_sink.go4
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go49
-rw-r--r--weed/replication/sink/s3sink/s3_write.go5
-rw-r--r--weed/replication/source/filer_source.go37
-rw-r--r--weed/replication/sub/notification_aws_sqs.go24
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go50
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go12
-rw-r--r--weed/replication/sub/notification_kafka.go14
-rw-r--r--weed/replication/sub/notifications.go2
16 files changed, 282 insertions, 218 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index ac8235fd5..051199adb 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -1,7 +1,8 @@
package replication
import (
- "path/filepath"
+ "context"
+ "fmt"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -16,10 +17,10 @@ type Replicator struct {
source *source.FilerSource
}
-func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator {
+func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
source := &source.FilerSource{}
- source.Initialize(sourceConfig)
+ source.Initialize(sourceConfig, configPrefix)
dataSink.SetSourceFiler(source)
@@ -29,12 +30,15 @@ func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSin
}
}
-func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error {
+func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_pb.EventNotification) error {
+ if message.IsFromOtherCluster && r.sink.GetName() == "filer" {
+ return nil
+ }
if !strings.HasPrefix(key, r.source.Dir) {
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil
}
- newKey := filepath.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
+ newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
glog.V(3).Infof("replicate %s => %s", key, newKey)
key = newKey
if message.OldEntry != nil && message.NewEntry == nil {
@@ -50,12 +54,17 @@ func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification)
return nil
}
- foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks)
+ foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks)
if foundExisting {
glog.V(4).Infof("updated %v", key)
return err
}
+ err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false)
+ if err != nil {
+ return fmt.Errorf("delete old entry %v: %v", key, err)
+ }
+
glog.V(4).Infof("creating missing %v", key)
return r.sink.CreateEntry(key, message.NewEntry)
}
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index 7acf37fa5..aef97c06e 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
-func (g *AzureSink) Initialize(configuration util.Configuration) error {
+func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("account_name"),
- configuration.GetString("account_key"),
- configuration.GetString("container"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"account_name"),
+ configuration.GetString(prefix+"account_key"),
+ configuration.GetString(prefix+"container"),
+ configuration.GetString(prefix+"directory"),
)
}
@@ -78,9 +78,7 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo
key = key + "/"
}
- ctx := context.Background()
-
- if _, err := g.containerURL.NewBlobURL(key).Delete(ctx,
+ if _, err := g.containerURL.NewBlobURL(key).Delete(context.Background(),
azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err)
}
@@ -98,15 +96,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
-
- ctx := context.Background()
+ chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
// Create a URL that references a to-be-created blob in your
// Azure Storage account's container.
appendBlobURL := g.containerURL.NewAppendBlobURL(key)
- _, err := appendBlobURL.Create(ctx, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
+ _, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
if err != nil {
return err
}
@@ -119,8 +115,8 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
var writeErr error
- _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
- _, writeErr = appendBlobURL.AppendBlock(ctx, bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
+ readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
})
if readErr != nil {
@@ -136,7 +132,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
-func (g *AzureSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
key = cleanKey(key)
// TODO improve efficiency
return false, nil
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index 17f5e39b2..1e7d82ed4 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir
}
-func (g *B2Sink) Initialize(configuration util.Configuration) error {
+func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("b2_account_id"),
- configuration.GetString("b2_master_application_key"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"b2_account_id"),
+ configuration.GetString(prefix+"b2_master_application_key"),
+ configuration.GetString(prefix+"bucket"),
+ configuration.GetString(prefix+"directory"),
)
}
@@ -45,8 +45,7 @@ func (g *B2Sink) SetSourceFiler(s *source.FilerSource) {
}
func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error {
- ctx := context.Background()
- client, err := b2.NewClient(ctx, accountId, accountKey)
+ client, err := b2.NewClient(context.Background(), accountId, accountKey)
if err != nil {
return err
}
@@ -66,16 +65,14 @@ func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool)
key = key + "/"
}
- ctx := context.Background()
-
- bucket, err := g.client.Bucket(ctx, g.bucket)
+ bucket, err := g.client.Bucket(context.Background(), g.bucket)
if err != nil {
return err
}
targetObject := bucket.Object(key)
- return targetObject.Delete(ctx)
+ return targetObject.Delete(context.Background())
}
@@ -88,17 +85,15 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
-
- ctx := context.Background()
+ chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
- bucket, err := g.client.Bucket(ctx, g.bucket)
+ bucket, err := g.client.Bucket(context.Background(), g.bucket)
if err != nil {
return err
}
targetObject := bucket.Object(key)
- writer := targetObject.NewWriter(ctx)
+ writer := targetObject.NewWriter(context.Background())
for _, chunk := range chunkViews {
@@ -108,7 +103,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
var writeErr error
- _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
+ readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, err := writer.Write(data)
if err != nil {
writeErr = err
@@ -128,7 +123,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
-func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
key = cleanKey(key)
diff --git a/weed/replication/sink/filersink/README.txt b/weed/replication/sink/filersink/README.txt
new file mode 100644
index 000000000..4ba0fc752
--- /dev/null
+++ b/weed/replication/sink/filersink/README.txt
@@ -0,0 +1,12 @@
+How replication works
+======
+
+All metadata changes within current cluster would be notified to a message queue.
+
+If the meta data change is from other clusters, this metadata would change would not be notified to the message queue.
+
+So active<=>active replication is possible.
+
+
+All metadata changes would be published as metadata changes.
+So all mounts listening for metadata changes will get updated. \ No newline at end of file
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index c14566723..bde29176c 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -3,41 +3,46 @@ package filersink
import (
"context"
"fmt"
- "strings"
"sync"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/security"
)
-func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) {
+func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 {
return
}
+
+ replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks))
+
var wg sync.WaitGroup
- for _, sourceChunk := range sourceChunks {
+ for chunkIndex, sourceChunk := range sourceChunks {
wg.Add(1)
- go func(chunk *filer_pb.FileChunk) {
+ go func(chunk *filer_pb.FileChunk, index int) {
defer wg.Done()
- replicatedChunk, e := fs.replicateOneChunk(chunk)
+ replicatedChunk, e := fs.replicateOneChunk(chunk, dir)
if e != nil {
err = e
}
- replicatedChunks = append(replicatedChunks, replicatedChunk)
- }(sourceChunk)
+ replicatedChunks[index] = replicatedChunk
+ }(sourceChunk, chunkIndex)
}
wg.Wait()
return
}
-func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) {
+func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) {
- fileId, err := fs.fetchAndWrite(sourceChunk)
+ fileId, err := fs.fetchAndWrite(sourceChunk, dir)
if err != nil {
- return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
+ return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
}
return &filer_pb.FileChunk{
@@ -46,21 +51,24 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_
Size: sourceChunk.Size,
Mtime: sourceChunk.Mtime,
ETag: sourceChunk.ETag,
- SourceFileId: sourceChunk.FileId,
+ SourceFileId: sourceChunk.GetFileIdString(),
+ CipherKey: sourceChunk.CipherKey,
+ IsCompressed: sourceChunk.IsCompressed,
}, nil
}
-func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
+func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) {
- filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.FileId)
+ filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString())
if err != nil {
- return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err)
+ return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
}
defer readCloser.Close()
var host string
+ var auth security.EncodedJwt
- if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -68,6 +76,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
+ ParentPath: dir,
}
resp, err := client.AssignVolume(context.Background(), request)
@@ -75,8 +84,11 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
- fileId, host = resp.FileId, resp.Url
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
return nil
}); err != nil {
@@ -87,8 +99,8 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
- uploadResult, err := operation.Upload(fileUrl, filename, readCloser,
- "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, "")
+ // fetch data as is, regardless whether it is encrypted or not
+ uploadResult, err, _ := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
return "", fmt.Errorf("upload data: %v", err)
@@ -101,23 +113,16 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri
return
}
-func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+var _ = filer_pb.FilerClient(&FilerSink{})
- grpcConnection, err := util.GrpcDial(fs.grpcAddress)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
- }
- defer grpcConnection.Close()
+func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, fs.grpcAddress, fs.grpcDialOption)
- return fn(client)
}
-
-func volumeId(fileId string) string {
- lastCommaIndex := strings.LastIndex(fileId, ",")
- if lastCommaIndex > 0 {
- return fileId[:lastCommaIndex]
- }
- return fileId
+func (fs *FilerSink) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 2e9cc86d1..50721a8f3 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -4,6 +4,10 @@ import (
"context"
"fmt"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -13,13 +17,14 @@ import (
)
type FilerSink struct {
- filerSource *source.FilerSource
- grpcAddress string
- dir string
- replication string
- collection string
- ttlSec int32
- dataCenter string
+ filerSource *source.FilerSource
+ grpcAddress string
+ dir string
+ replication string
+ collection string
+ ttlSec int32
+ dataCenter string
+ grpcDialOption grpc.DialOption
}
func init() {
@@ -34,13 +39,13 @@ func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir
}
-func (fs *FilerSink) Initialize(configuration util.Configuration) error {
+func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
return fs.initialize(
- configuration.GetString("grpcAddress"),
- configuration.GetString("directory"),
- configuration.GetString("replication"),
- configuration.GetString("collection"),
- configuration.GetInt("ttlSec"),
+ configuration.GetString(prefix+"grpcAddress"),
+ configuration.GetString(prefix+"directory"),
+ configuration.GetString(prefix+"replication"),
+ configuration.GetString(prefix+"collection"),
+ configuration.GetInt(prefix+"ttlSec"),
)
}
@@ -55,37 +60,28 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
+ fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
return nil
}
func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
- return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- dir, name := filer2.FullPath(key).DirAndName()
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir,
- Name: name,
- IsDeleteData: deleteIncludeChunks,
- }
+ dir, name := util.FullPath(key).DirAndName()
- glog.V(1).Infof("delete entry: %v", request)
- _, err := client.DeleteEntry(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("delete entry %s: %v", key, err)
- return fmt.Errorf("delete entry %s: %v", key, err)
- }
-
- return nil
- })
+ glog.V(1).Infof("delete entry: %v", key)
+ err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false, true)
+ if err != nil {
+ glog.V(0).Infof("delete entry %s: %v", key, err)
+ return fmt.Errorf("delete entry %s: %v", key, err)
+ }
+ return nil
}
func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
- return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- dir, name := filer2.FullPath(key).DirAndName()
- ctx := context.Background()
+ dir, name := util.FullPath(key).DirAndName()
// look up existing entry
lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
@@ -93,14 +89,14 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
Name: name,
}
glog.V(1).Infof("lookup: %v", lookupRequest)
- if resp, err := client.LookupDirectoryEntry(ctx, lookupRequest); err == nil {
- if filer2.ETag(resp.Entry.Chunks) == filer2.ETag(entry.Chunks) {
+ if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
+ if filer2.ETag(resp.Entry) == filer2.ETag(entry) {
glog.V(0).Infof("already replicated %s", key)
return nil
}
}
- replicatedChunks, err := fs.replicateChunks(entry.Chunks)
+ replicatedChunks, err := fs.replicateChunks(entry.Chunks, dir)
if err != nil {
glog.V(0).Infof("replicate entry chunks %s: %v", key, err)
@@ -117,10 +113,11 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
Attributes: entry.Attributes,
Chunks: replicatedChunks,
},
+ IsFromOtherCluster: true,
}
glog.V(1).Infof("create: %v", request)
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("create entry %s: %v", key, err)
return fmt.Errorf("create entry %s: %v", key, err)
}
@@ -129,15 +126,13 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
})
}
-func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
-
- ctx := context.Background()
+func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
- dir, name := filer2.FullPath(key).DirAndName()
+ dir, name := util.FullPath(key).DirAndName()
// read existing entry
var existingEntry *filer_pb.Entry
- err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
@@ -145,7 +140,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry,
}
glog.V(4).Infof("lookup entry: %v", request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
+ resp, err := filer_pb.LookupEntry(client, request)
if err != nil {
glog.V(0).Infof("lookup %s: %v", key, err)
return err
@@ -166,7 +161,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry,
// skip if already changed
// this usually happens when the messages are not ordered
glog.V(0).Infof("late updates %s", key)
- } else if filer2.ETag(newEntry.Chunks) == filer2.ETag(existingEntry.Chunks) {
+ } else if filer2.ETag(newEntry) == filer2.ETag(existingEntry) {
// skip if no change
// this usually happens when retrying the replication
glog.V(0).Infof("already replicated %s", key)
@@ -177,11 +172,11 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry,
// delete the chunks that are deleted from the source
if deleteIncludeChunks {
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
- existingEntry.Chunks = minusChunks(existingEntry.Chunks, deletedChunks)
+ existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks)
}
// replicate the chunks that are new in the source
- replicatedChunks, err := fs.replicateChunks(newChunks)
+ replicatedChunks, err := fs.replicateChunks(newChunks, newParentPath)
if err != nil {
return true, fmt.Errorf("replicte %s chunks error: %v", key, err)
}
@@ -189,14 +184,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry,
}
// save updated meta data
- return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: existingEntry,
+ Directory: newParentPath,
+ Entry: existingEntry,
+ IsFromOtherCluster: true,
}
- if _, err := client.UpdateEntry(ctx, request); err != nil {
+ if _, err := client.UpdateEntry(context.Background(), request); err != nil {
return fmt.Errorf("update existingEntry %s: %v", key, err)
}
@@ -205,23 +201,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry,
}
func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
- deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks)
- newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks)
- return
-}
-
-func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
- for _, a := range as {
- found := false
- for _, b := range bs {
- if a.FileId == b.FileId {
- found = true
- break
- }
- }
- if !found {
- delta = append(delta, a)
- }
- }
+ deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks)
+ newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks)
return
}
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index c1beefc33..bb5a54272 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -6,13 +6,14 @@ import (
"os"
"cloud.google.com/go/storage"
+ "google.golang.org/api/option"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/api/option"
)
type GcsSink struct {
@@ -34,11 +35,11 @@ func (g *GcsSink) GetSinkToDirectory() string {
return g.dir
}
-func (g *GcsSink) Initialize(configuration util.Configuration) error {
+func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("google_application_credentials"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"google_application_credentials"),
+ configuration.GetString(prefix+"bucket"),
+ configuration.GetString(prefix+"directory"),
)
}
@@ -50,7 +51,6 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str
g.bucket = bucketName
g.dir = dir
- ctx := context.Background()
// Creates a client.
if google_application_credentials == "" {
var found bool
@@ -59,7 +59,7 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str
glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
}
}
- client, err := storage.NewClient(ctx, option.WithCredentialsFile(google_application_credentials))
+ client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials))
if err != nil {
glog.Fatalf("Failed to create client: %v", err)
}
@@ -90,11 +90,9 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
-
- ctx := context.Background()
+ chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
- wc := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx)
+ wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
for _, chunk := range chunkViews {
@@ -103,7 +101,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
return err
}
- _, err = util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
+ err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
wc.Write(data)
})
@@ -121,7 +119,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
-func (g *GcsSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+func (g *GcsSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
// TODO improve efficiency
return false, nil
}
diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go
index 0a86139d3..6d85f660a 100644
--- a/weed/replication/sink/replication_sink.go
+++ b/weed/replication/sink/replication_sink.go
@@ -8,10 +8,10 @@ import (
type ReplicationSink interface {
GetName() string
- Initialize(configuration util.Configuration) error
+ Initialize(configuration util.Configuration, prefix string) error
DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error
CreateEntry(key string, entry *filer_pb.Entry) error
- UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error)
+ UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error)
GetSinkToDirectory() string
SetSourceFiler(s *source.FilerSource)
}
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 0a4e78318..d7af105b8 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -1,6 +1,7 @@
package S3Sink
import (
+ "context"
"fmt"
"strings"
"sync"
@@ -10,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -23,6 +25,7 @@ type S3Sink struct {
region string
bucket string
dir string
+ endpoint string
filerSource *source.FilerSource
}
@@ -38,16 +41,18 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
return s3sink.dir
}
-func (s3sink *S3Sink) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory"))
+func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
+ glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
+ glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
+ glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
return s3sink.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"aws_access_key_id"),
+ configuration.GetString(prefix+"aws_secret_access_key"),
+ configuration.GetString(prefix+"region"),
+ configuration.GetString(prefix+"bucket"),
+ configuration.GetString(prefix+"directory"),
+ configuration.GetString(prefix+"endpoint"),
)
}
@@ -55,16 +60,18 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) {
s3sink.filerSource = s
}
-func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, bucket, dir string) error {
+func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint string) error {
s3sink.region = region
s3sink.bucket = bucket
s3sink.dir = dir
+ s3sink.endpoint = endpoint
config := &aws.Config{
- Region: aws.String(s3sink.region),
+ Region: aws.String(s3sink.region),
+ Endpoint: aws.String(s3sink.endpoint),
}
- if awsAccessKeyId != "" && aswSecretAccessKey != "" {
- config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ if awsAccessKeyId != "" && awsSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
}
sess, err := session.NewSession(config)
@@ -89,7 +96,6 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b
}
func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
-
key = cleanKey(key)
if entry.IsDirectory {
@@ -102,21 +108,22 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
totalSize := filer2.TotalSize(entry.Chunks)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
+ chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize))
+
+ parts := make([]*s3.CompletedPart, len(chunkViews))
- var parts []*s3.CompletedPart
var wg sync.WaitGroup
for chunkIndex, chunk := range chunkViews {
partId := chunkIndex + 1
wg.Add(1)
- go func(chunk *filer2.ChunkView) {
+ go func(chunk *filer2.ChunkView, index int) {
defer wg.Done()
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil {
err = uploadErr
} else {
- parts = append(parts, part)
+ parts[index] = part
}
- }(chunk)
+ }(chunk, chunkIndex)
}
wg.Wait()
@@ -125,11 +132,11 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
return err
}
- return s3sink.completeMultipartUpload(key, uploadId, parts)
+ return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts)
}
-func (s3sink *S3Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
key = cleanKey(key)
// TODO improve efficiency
return false, nil
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index 5c4be7aee..c5c65ed5c 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -2,6 +2,7 @@ package S3Sink
import (
"bytes"
+ "context"
"fmt"
"io"
@@ -81,7 +82,7 @@ func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error {
}
// To complete multipart upload
-func (s3sink *S3Sink) completeMultipartUpload(key, uploadId string, parts []*s3.CompletedPart) error {
+func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId string, parts []*s3.CompletedPart) error {
input := &s3.CompleteMultipartUploadInput{
Bucket: aws.String(s3sink.bucket),
Key: aws.String(key),
@@ -161,6 +162,6 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, e
return nil, err
}
buf := make([]byte, chunk.Size)
- util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf, true)
+ util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf)
return bytes.NewReader(buf), nil
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index efe71e706..69c23fe82 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -7,6 +7,11 @@ import (
"net/http"
"strings"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -17,20 +22,22 @@ type ReplicationSource interface {
}
type FilerSource struct {
- grpcAddress string
- Dir string
+ grpcAddress string
+ grpcDialOption grpc.DialOption
+ Dir string
}
-func (fs *FilerSource) Initialize(configuration util.Configuration) error {
+func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
return fs.initialize(
- configuration.GetString("grpcAddress"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"grpcAddress"),
+ configuration.GetString(prefix+"directory"),
)
}
func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
fs.grpcAddress = grpcAddress
fs.Dir = dir
+ fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
return nil
}
@@ -40,7 +47,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
vid := volumeId(part)
- err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read lookup volume id locations: %v", vid)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
@@ -84,17 +91,19 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade
return filename, header, readCloser, err
}
-func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+var _ = filer_pb.FilerClient(&FilerSource{})
- grpcConnection, err := util.GrpcDial(fs.grpcAddress)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
- }
- defer grpcConnection.Close()
+func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, fs.grpcAddress, fs.grpcDialOption)
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+}
- return fn(client)
+func (fs *FilerSource) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
}
func volumeId(fileId string) string {
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index f0100f4de..1dd386ba7 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -27,24 +27,24 @@ func (k *AwsSqsInput) GetName() string {
return "aws_sqs"
}
-func (k *AwsSqsInput) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
+ glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("sqs_queue_name"),
+ configuration.GetString(prefix+"aws_access_key_id"),
+ configuration.GetString(prefix+"aws_secret_access_key"),
+ configuration.GetString(prefix+"region"),
+ configuration.GetString(prefix+"sqs_queue_name"),
)
}
-func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) {
config := &aws.Config{
Region: aws.String(region),
}
- if awsAccessKeyId != "" && aswSecretAccessKey != "" {
- config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ if awsAccessKeyId != "" && awsSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
}
sess, err := session.NewSession(config)
@@ -92,7 +92,9 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif
}
// process the message
- key = *result.Messages[0].Attributes["key"]
+ // fmt.Printf("messages: %+v\n", result.Messages[0])
+ keyValue := result.Messages[0].MessageAttributes["key"]
+ key = *keyValue.StringValue
text := *result.Messages[0].Body
message = &filer_pb.EventNotification{}
err = proto.UnmarshalText(text, message)
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
new file mode 100644
index 000000000..9726096e5
--- /dev/null
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -0,0 +1,50 @@
+package sub
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "gocloud.dev/pubsub"
+ _ "gocloud.dev/pubsub/awssnssqs"
+ // _ "gocloud.dev/pubsub/azuresb"
+ _ "gocloud.dev/pubsub/gcppubsub"
+ _ "gocloud.dev/pubsub/natspubsub"
+ _ "gocloud.dev/pubsub/rabbitpubsub"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
+}
+
+type GoCDKPubSubInput struct {
+ sub *pubsub.Subscription
+}
+
+func (k *GoCDKPubSubInput) GetName() string {
+ return "gocdk_pub_sub"
+}
+
+func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
+ subURL := configuration.GetString(prefix + "sub_url")
+ glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
+ sub, err := pubsub.OpenSubscription(context.Background(), subURL)
+ if err != nil {
+ return err
+ }
+ k.sub = sub
+ return nil
+}
+
+func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+ msg, err := k.sub.Receive(context.Background())
+ key = msg.Metadata["key"]
+ message = &filer_pb.EventNotification{}
+ err = proto.Unmarshal(msg.Body, message)
+ if err != nil {
+ return "", nil, err
+ }
+ return key, message, nil
+}
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index ad6b42a2e..a950bb42b 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -27,13 +27,13 @@ func (k *GooglePubSubInput) GetName() string {
return "google_pub_sub"
}
-func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id"))
- glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic"))
+func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
+ glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetString("google_application_credentials"),
- configuration.GetString("project_id"),
- configuration.GetString("topic"),
+ configuration.GetString(prefix+"google_application_credentials"),
+ configuration.GetString(prefix+"project_id"),
+ configuration.GetString(prefix+"topic"),
)
}
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index 1a86a8307..fa9cfad9b 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -28,14 +28,14 @@ func (k *KafkaInput) GetName() string {
return "kafka"
}
-func (k *KafkaInput) Initialize(configuration util.Configuration) error {
- glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
- glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic"))
+func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
+ glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetStringSlice("hosts"),
- configuration.GetString("topic"),
- configuration.GetString("offsetFile"),
- configuration.GetInt("offsetSaveIntervalSeconds"),
+ configuration.GetStringSlice(prefix+"hosts"),
+ configuration.GetString(prefix+"topic"),
+ configuration.GetString(prefix+"offsetFile"),
+ configuration.GetInt(prefix+"offsetSaveIntervalSeconds"),
)
}
diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go
index 66fbef824..8a2668f98 100644
--- a/weed/replication/sub/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -9,7 +9,7 @@ type NotificationInput interface {
// GetName gets the name to locate the configuration in sync.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration util.Configuration) error
+ Initialize(configuration util.Configuration, prefix string) error
ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
}