diff options
| author | yourchanges <yourchanges@gmail.com> | 2020-07-10 09:44:32 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-10 09:44:32 +0800 |
| commit | e67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch) | |
| tree | 4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/replication | |
| parent | 2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff) | |
| parent | 1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff) | |
| download | seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip | |
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 21 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 26 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 31 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/README.txt | 12 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 73 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 116 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 24 | ||||
| -rw-r--r-- | weed/replication/sink/replication_sink.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 49 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_write.go | 5 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 37 | ||||
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 24 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 50 | ||||
| -rw-r--r-- | weed/replication/sub/notification_google_pub_sub.go | 12 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 14 | ||||
| -rw-r--r-- | weed/replication/sub/notifications.go | 2 |
16 files changed, 282 insertions, 218 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index ac8235fd5..051199adb 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -1,7 +1,8 @@ package replication import ( - "path/filepath" + "context" + "fmt" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -16,10 +17,10 @@ type Replicator struct { source *source.FilerSource } -func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator { +func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator { source := &source.FilerSource{} - source.Initialize(sourceConfig) + source.Initialize(sourceConfig, configPrefix) dataSink.SetSourceFiler(source) @@ -29,12 +30,15 @@ func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSin } } -func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error { +func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_pb.EventNotification) error { + if message.IsFromOtherCluster && r.sink.GetName() == "filer" { + return nil + } if !strings.HasPrefix(key, r.source.Dir) { glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) return nil } - newKey := filepath.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) + newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) glog.V(3).Infof("replicate %s => %s", key, newKey) key = newKey if message.OldEntry != nil && message.NewEntry == nil { @@ -50,12 +54,17 @@ func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) return nil } - foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks) + foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks) if foundExisting { glog.V(4).Infof("updated %v", key) return err } + err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false) + if err != nil { + return fmt.Errorf("delete old entry %v: %v", key, err) + } + glog.V(4).Infof("creating missing %v", key) return r.sink.CreateEntry(key, message.NewEntry) } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 7acf37fa5..aef97c06e 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string { return g.dir } -func (g *AzureSink) Initialize(configuration util.Configuration) error { +func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("account_name"), - configuration.GetString("account_key"), - configuration.GetString("container"), - configuration.GetString("directory"), + configuration.GetString(prefix+"account_name"), + configuration.GetString(prefix+"account_key"), + configuration.GetString(prefix+"container"), + configuration.GetString(prefix+"directory"), ) } @@ -78,9 +78,7 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo key = key + "/" } - ctx := context.Background() - - if _, err := g.containerURL.NewBlobURL(key).Delete(ctx, + if _, err := g.containerURL.NewBlobURL(key).Delete(context.Background(), azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil { return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err) } @@ -98,15 +96,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - - ctx := context.Background() + chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your // Azure Storage account's container. appendBlobURL := g.containerURL.NewAppendBlobURL(key) - _, err := appendBlobURL.Create(ctx, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) + _, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) if err != nil { return err } @@ -119,8 +115,8 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } var writeErr error - _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { - _, writeErr = appendBlobURL.AppendBlock(ctx, bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) + readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) }) if readErr != nil { @@ -136,7 +132,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *AzureSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 17f5e39b2..1e7d82ed4 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string { return g.dir } -func (g *B2Sink) Initialize(configuration util.Configuration) error { +func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("b2_account_id"), - configuration.GetString("b2_master_application_key"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"b2_account_id"), + configuration.GetString(prefix+"b2_master_application_key"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } @@ -45,8 +45,7 @@ func (g *B2Sink) SetSourceFiler(s *source.FilerSource) { } func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { - ctx := context.Background() - client, err := b2.NewClient(ctx, accountId, accountKey) + client, err := b2.NewClient(context.Background(), accountId, accountKey) if err != nil { return err } @@ -66,16 +65,14 @@ func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) key = key + "/" } - ctx := context.Background() - - bucket, err := g.client.Bucket(ctx, g.bucket) + bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { return err } targetObject := bucket.Object(key) - return targetObject.Delete(ctx) + return targetObject.Delete(context.Background()) } @@ -88,17 +85,15 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - - ctx := context.Background() + chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) - bucket, err := g.client.Bucket(ctx, g.bucket) + bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { return err } targetObject := bucket.Object(key) - writer := targetObject.NewWriter(ctx) + writer := targetObject.NewWriter(context.Background()) for _, chunk := range chunkViews { @@ -108,7 +103,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } var writeErr error - _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { + readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { _, err := writer.Write(data) if err != nil { writeErr = err @@ -128,7 +123,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) diff --git a/weed/replication/sink/filersink/README.txt b/weed/replication/sink/filersink/README.txt new file mode 100644 index 000000000..4ba0fc752 --- /dev/null +++ b/weed/replication/sink/filersink/README.txt @@ -0,0 +1,12 @@ +How replication works +====== + +All metadata changes within current cluster would be notified to a message queue. + +If the meta data change is from other clusters, this metadata would change would not be notified to the message queue. + +So active<=>active replication is possible. + + +All metadata changes would be published as metadata changes. +So all mounts listening for metadata changes will get updated.
\ No newline at end of file diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index c14566723..bde29176c 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,41 +3,46 @@ package filersink import ( "context" "fmt" - "strings" "sync" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/security" ) -func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } + + replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks)) + var wg sync.WaitGroup - for _, sourceChunk := range sourceChunks { + for chunkIndex, sourceChunk := range sourceChunks { wg.Add(1) - go func(chunk *filer_pb.FileChunk) { + go func(chunk *filer_pb.FileChunk, index int) { defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(chunk) + replicatedChunk, e := fs.replicateOneChunk(chunk, dir) if e != nil { err = e } - replicatedChunks = append(replicatedChunks, replicatedChunk) - }(sourceChunk) + replicatedChunks[index] = replicatedChunk + }(sourceChunk, chunkIndex) } wg.Wait() return } -func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(sourceChunk) + fileId, err := fs.fetchAndWrite(sourceChunk, dir) if err != nil { - return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) + return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) } return &filer_pb.FileChunk{ @@ -46,21 +51,24 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_ Size: sourceChunk.Size, Mtime: sourceChunk.Mtime, ETag: sourceChunk.ETag, - SourceFileId: sourceChunk.FileId, + SourceFileId: sourceChunk.GetFileIdString(), + CipherKey: sourceChunk.CipherKey, + IsCompressed: sourceChunk.IsCompressed, }, nil } -func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { +func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { - filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.FileId) + filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) if err != nil { - return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err) + return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) } defer readCloser.Close() var host string + var auth security.EncodedJwt - if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -68,6 +76,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri Collection: fs.collection, TtlSec: fs.ttlSec, DataCenter: fs.dataCenter, + ParentPath: dir, } resp, err := client.AssignVolume(context.Background(), request) @@ -75,8 +84,11 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } - fileId, host = resp.FileId, resp.Url + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) return nil }); err != nil { @@ -87,8 +99,8 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) - uploadResult, err := operation.Upload(fileUrl, filename, readCloser, - "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, "") + // fetch data as is, regardless whether it is encrypted or not + uploadResult, err, _ := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) return "", fmt.Errorf("upload data: %v", err) @@ -101,23 +113,16 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri return } -func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +var _ = filer_pb.FilerClient(&FilerSink{}) - grpcConnection, err := util.GrpcDial(fs.grpcAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) - } - defer grpcConnection.Close() +func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, fs.grpcAddress, fs.grpcDialOption) - return fn(client) } - -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId +func (fs *FilerSink) AdjustedUrl(hostAndPort string) string { + return hostAndPort } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 2e9cc86d1..50721a8f3 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -4,6 +4,10 @@ import ( "context" "fmt" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -13,13 +17,14 @@ import ( ) type FilerSink struct { - filerSource *source.FilerSource - grpcAddress string - dir string - replication string - collection string - ttlSec int32 - dataCenter string + filerSource *source.FilerSource + grpcAddress string + dir string + replication string + collection string + ttlSec int32 + dataCenter string + grpcDialOption grpc.DialOption } func init() { @@ -34,13 +39,13 @@ func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } -func (fs *FilerSink) Initialize(configuration util.Configuration) error { +func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), - configuration.GetString("replication"), - configuration.GetString("collection"), - configuration.GetInt("ttlSec"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), + configuration.GetString(prefix+"replication"), + configuration.GetString(prefix+"collection"), + configuration.GetInt(prefix+"ttlSec"), ) } @@ -55,37 +60,28 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, fs.replication = replication fs.collection = collection fs.ttlSec = int32(ttlSec) + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { - return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - dir, name := filer2.FullPath(key).DirAndName() - request := &filer_pb.DeleteEntryRequest{ - Directory: dir, - Name: name, - IsDeleteData: deleteIncludeChunks, - } + dir, name := util.FullPath(key).DirAndName() - glog.V(1).Infof("delete entry: %v", request) - _, err := client.DeleteEntry(context.Background(), request) - if err != nil { - glog.V(0).Infof("delete entry %s: %v", key, err) - return fmt.Errorf("delete entry %s: %v", key, err) - } - - return nil - }) + glog.V(1).Infof("delete entry: %v", key) + err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false, true) + if err != nil { + glog.V(0).Infof("delete entry %s: %v", key, err) + return fmt.Errorf("delete entry %s: %v", key, err) + } + return nil } func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { - return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(key).DirAndName() - ctx := context.Background() + dir, name := util.FullPath(key).DirAndName() // look up existing entry lookupRequest := &filer_pb.LookupDirectoryEntryRequest{ @@ -93,14 +89,14 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { Name: name, } glog.V(1).Infof("lookup: %v", lookupRequest) - if resp, err := client.LookupDirectoryEntry(ctx, lookupRequest); err == nil { - if filer2.ETag(resp.Entry.Chunks) == filer2.ETag(entry.Chunks) { + if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil { + if filer2.ETag(resp.Entry) == filer2.ETag(entry) { glog.V(0).Infof("already replicated %s", key) return nil } } - replicatedChunks, err := fs.replicateChunks(entry.Chunks) + replicatedChunks, err := fs.replicateChunks(entry.Chunks, dir) if err != nil { glog.V(0).Infof("replicate entry chunks %s: %v", key, err) @@ -117,10 +113,11 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { Attributes: entry.Attributes, Chunks: replicatedChunks, }, + IsFromOtherCluster: true, } glog.V(1).Infof("create: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { glog.V(0).Infof("create entry %s: %v", key, err) return fmt.Errorf("create entry %s: %v", key, err) } @@ -129,15 +126,13 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { }) } -func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { - - ctx := context.Background() +func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { - dir, name := filer2.FullPath(key).DirAndName() + dir, name := util.FullPath(key).DirAndName() // read existing entry var existingEntry *filer_pb.Entry - err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -145,7 +140,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, } glog.V(4).Infof("lookup entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) + resp, err := filer_pb.LookupEntry(client, request) if err != nil { glog.V(0).Infof("lookup %s: %v", key, err) return err @@ -166,7 +161,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, // skip if already changed // this usually happens when the messages are not ordered glog.V(0).Infof("late updates %s", key) - } else if filer2.ETag(newEntry.Chunks) == filer2.ETag(existingEntry.Chunks) { + } else if filer2.ETag(newEntry) == filer2.ETag(existingEntry) { // skip if no change // this usually happens when retrying the replication glog.V(0).Infof("already replicated %s", key) @@ -177,11 +172,11 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = minusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(newChunks) + replicatedChunks, err := fs.replicateChunks(newChunks, newParentPath) if err != nil { return true, fmt.Errorf("replicte %s chunks error: %v", key, err) } @@ -189,14 +184,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, } // save updated meta data - return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: existingEntry, + Directory: newParentPath, + Entry: existingEntry, + IsFromOtherCluster: true, } - if _, err := client.UpdateEntry(ctx, request); err != nil { + if _, err := client.UpdateEntry(context.Background(), request); err != nil { return fmt.Errorf("update existingEntry %s: %v", key, err) } @@ -205,23 +201,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, } func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { - deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks) - newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks) - return -} - -func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { - for _, a := range as { - found := false - for _, b := range bs { - if a.FileId == b.FileId { - found = true - break - } - } - if !found { - delta = append(delta, a) - } - } + deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks) + newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks) return } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index c1beefc33..bb5a54272 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -6,13 +6,14 @@ import ( "os" "cloud.google.com/go/storage" + "google.golang.org/api/option" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/api/option" ) type GcsSink struct { @@ -34,11 +35,11 @@ func (g *GcsSink) GetSinkToDirectory() string { return g.dir } -func (g *GcsSink) Initialize(configuration util.Configuration) error { +func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } @@ -50,7 +51,6 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str g.bucket = bucketName g.dir = dir - ctx := context.Background() // Creates a client. if google_application_credentials == "" { var found bool @@ -59,7 +59,7 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") } } - client, err := storage.NewClient(ctx, option.WithCredentialsFile(google_application_credentials)) + client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials)) if err != nil { glog.Fatalf("Failed to create client: %v", err) } @@ -90,11 +90,9 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - - ctx := context.Background() + chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) - wc := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx) + wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) for _, chunk := range chunkViews { @@ -103,7 +101,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { return err } - _, err = util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { + err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { wc.Write(data) }) @@ -121,7 +119,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *GcsSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *GcsSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { // TODO improve efficiency return false, nil } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index 0a86139d3..6d85f660a 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -8,10 +8,10 @@ import ( type ReplicationSink interface { GetName() string - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error CreateEntry(key string, entry *filer_pb.Entry) error - UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) + UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 0a4e78318..d7af105b8 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -1,6 +1,7 @@ package S3Sink import ( + "context" "fmt" "strings" "sync" @@ -10,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -23,6 +25,7 @@ type S3Sink struct { region string bucket string dir string + endpoint string filerSource *source.FilerSource } @@ -38,16 +41,18 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { return s3sink.dir } -func (s3sink *S3Sink) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory")) +func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) + glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) + glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) return s3sink.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), + configuration.GetString(prefix+"endpoint"), ) } @@ -55,16 +60,18 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) { s3sink.filerSource = s } -func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, bucket, dir string) error { +func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint string) error { s3sink.region = region s3sink.bucket = bucket s3sink.dir = dir + s3sink.endpoint = endpoint config := &aws.Config{ - Region: aws.String(s3sink.region), + Region: aws.String(s3sink.region), + Endpoint: aws.String(s3sink.endpoint), } - if awsAccessKeyId != "" && aswSecretAccessKey != "" { - config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + if awsAccessKeyId != "" && awsSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") } sess, err := session.NewSession(config) @@ -89,7 +96,6 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b } func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { - key = cleanKey(key) if entry.IsDirectory { @@ -102,21 +108,22 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) + chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + + parts := make([]*s3.CompletedPart, len(chunkViews)) - var parts []*s3.CompletedPart var wg sync.WaitGroup for chunkIndex, chunk := range chunkViews { partId := chunkIndex + 1 wg.Add(1) - go func(chunk *filer2.ChunkView) { + go func(chunk *filer2.ChunkView, index int) { defer wg.Done() if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { err = uploadErr } else { - parts = append(parts, part) + parts[index] = part } - }(chunk) + }(chunk, chunkIndex) } wg.Wait() @@ -125,11 +132,11 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { return err } - return s3sink.completeMultipartUpload(key, uploadId, parts) + return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts) } -func (s3sink *S3Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index 5c4be7aee..c5c65ed5c 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -2,6 +2,7 @@ package S3Sink import ( "bytes" + "context" "fmt" "io" @@ -81,7 +82,7 @@ func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error { } // To complete multipart upload -func (s3sink *S3Sink) completeMultipartUpload(key, uploadId string, parts []*s3.CompletedPart) error { +func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId string, parts []*s3.CompletedPart) error { input := &s3.CompleteMultipartUploadInput{ Bucket: aws.String(s3sink.bucket), Key: aws.String(key), @@ -161,6 +162,6 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, e return nil, err } buf := make([]byte, chunk.Size) - util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf, true) + util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf) return bytes.NewReader(buf), nil } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index efe71e706..69c23fe82 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -7,6 +7,11 @@ import ( "net/http" "strings" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -17,20 +22,22 @@ type ReplicationSource interface { } type FilerSource struct { - grpcAddress string - Dir string + grpcAddress string + grpcDialOption grpc.DialOption + Dir string } -func (fs *FilerSource) Initialize(configuration util.Configuration) error { +func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), ) } func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.Dir = dir + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } @@ -40,7 +47,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { vid := volumeId(part) - err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read lookup volume id locations: %v", vid) resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ @@ -84,17 +91,19 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade return filename, header, readCloser, err } -func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +var _ = filer_pb.FilerClient(&FilerSource{}) - grpcConnection, err := util.GrpcDial(fs.grpcAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) - } - defer grpcConnection.Close() +func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, fs.grpcAddress, fs.grpcDialOption) - client := filer_pb.NewSeaweedFilerClient(grpcConnection) +} - return fn(client) +func (fs *FilerSource) AdjustedUrl(hostAndPort string) string { + return hostAndPort } func volumeId(fileId string) string { diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index f0100f4de..1dd386ba7 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -27,24 +27,24 @@ func (k *AwsSqsInput) GetName() string { return "aws_sqs" } -func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } -func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { +func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) { config := &aws.Config{ Region: aws.String(region), } - if awsAccessKeyId != "" && aswSecretAccessKey != "" { - config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + if awsAccessKeyId != "" && awsSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") } sess, err := session.NewSession(config) @@ -92,7 +92,9 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif } // process the message - key = *result.Messages[0].Attributes["key"] + // fmt.Printf("messages: %+v\n", result.Messages[0]) + keyValue := result.Messages[0].MessageAttributes["key"] + key = *keyValue.StringValue text := *result.Messages[0].Body message = &filer_pb.EventNotification{} err = proto.UnmarshalText(text, message) diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go new file mode 100644 index 000000000..9726096e5 --- /dev/null +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -0,0 +1,50 @@ +package sub + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "gocloud.dev/pubsub" + _ "gocloud.dev/pubsub/awssnssqs" + // _ "gocloud.dev/pubsub/azuresb" + _ "gocloud.dev/pubsub/gcppubsub" + _ "gocloud.dev/pubsub/natspubsub" + _ "gocloud.dev/pubsub/rabbitpubsub" +) + +func init() { + NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{}) +} + +type GoCDKPubSubInput struct { + sub *pubsub.Subscription +} + +func (k *GoCDKPubSubInput) GetName() string { + return "gocdk_pub_sub" +} + +func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { + subURL := configuration.GetString(prefix + "sub_url") + glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) + sub, err := pubsub.OpenSubscription(context.Background(), subURL) + if err != nil { + return err + } + k.sub = sub + return nil +} + +func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + msg, err := k.sub.Receive(context.Background()) + key = msg.Metadata["key"] + message = &filer_pb.EventNotification{} + err = proto.Unmarshal(msg.Body, message) + if err != nil { + return "", nil, err + } + return key, message, nil +} diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index ad6b42a2e..a950bb42b 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -27,13 +27,13 @@ func (k *GooglePubSubInput) GetName() string { return "google_pub_sub" } -func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index 1a86a8307..fa9cfad9b 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -28,14 +28,14 @@ func (k *KafkaInput) GetName() string { return "kafka" } -func (k *KafkaInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), - configuration.GetString("offsetFile"), - configuration.GetInt("offsetSaveIntervalSeconds"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), + configuration.GetString(prefix+"offsetFile"), + configuration.GetInt(prefix+"offsetSaveIntervalSeconds"), ) } diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go index 66fbef824..8a2668f98 100644 --- a/weed/replication/sub/notifications.go +++ b/weed/replication/sub/notifications.go @@ -9,7 +9,7 @@ type NotificationInput interface { // GetName gets the name to locate the configuration in sync.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) } |
