diff options
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 14 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 26 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 31 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 35 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 47 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 28 | ||||
| -rw-r--r-- | weed/replication/sink/replication_sink.go | 9 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 29 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_write.go | 10 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 30 | ||||
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 14 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 4 | ||||
| -rw-r--r-- | weed/replication/sub/notification_google_pub_sub.go | 12 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 14 | ||||
| -rw-r--r-- | weed/replication/sub/notifications.go | 2 |
15 files changed, 157 insertions, 148 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 7353cdc91..a91c2ddd3 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -18,10 +18,10 @@ type Replicator struct { source *source.FilerSource } -func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator { +func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator { source := &source.FilerSource{} - source.Initialize(sourceConfig) + source.Initialize(sourceConfig, configPrefix) dataSink.SetSourceFiler(source) @@ -41,28 +41,28 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p key = newKey if message.OldEntry != nil && message.NewEntry == nil { glog.V(4).Infof("deleting %v", key) - return r.sink.DeleteEntry(ctx, key, message.OldEntry.IsDirectory, message.DeleteChunks) + return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks) } if message.OldEntry == nil && message.NewEntry != nil { glog.V(4).Infof("creating %v", key) - return r.sink.CreateEntry(ctx, key, message.NewEntry) + return r.sink.CreateEntry(key, message.NewEntry) } if message.OldEntry == nil && message.NewEntry == nil { glog.V(0).Infof("weird message %+v", message) return nil } - foundExisting, err := r.sink.UpdateEntry(ctx, key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks) + foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks) if foundExisting { glog.V(4).Infof("updated %v", key) return err } - err = r.sink.DeleteEntry(ctx, key, message.OldEntry.IsDirectory, false) + err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false) if err != nil { return fmt.Errorf("delete old entry %v: %v", key, err) } glog.V(4).Infof("creating missing %v", key) - return r.sink.CreateEntry(ctx, key, message.NewEntry) + return r.sink.CreateEntry(key, message.NewEntry) } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 6381908a1..89e04922f 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string { return g.dir } -func (g *AzureSink) Initialize(configuration util.Configuration) error { +func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("account_name"), - configuration.GetString("account_key"), - configuration.GetString("container"), - configuration.GetString("directory"), + configuration.GetString(prefix+"account_name"), + configuration.GetString(prefix+"account_key"), + configuration.GetString(prefix+"container"), + configuration.GetString(prefix+"directory"), ) } @@ -70,7 +70,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e return nil } -func (g *AzureSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { +func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { key = cleanKey(key) @@ -78,7 +78,7 @@ func (g *AzureSink) DeleteEntry(ctx context.Context, key string, isDirectory, de key = key + "/" } - if _, err := g.containerURL.NewBlobURL(key).Delete(ctx, + if _, err := g.containerURL.NewBlobURL(key).Delete(context.Background(), azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil { return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err) } @@ -87,7 +87,7 @@ func (g *AzureSink) DeleteEntry(ctx context.Context, key string, isDirectory, de } -func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { +func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { key = cleanKey(key) @@ -102,21 +102,21 @@ func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb // Azure Storage account's container. appendBlobURL := g.containerURL.NewAppendBlobURL(key) - _, err := appendBlobURL.Create(ctx, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) + _, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) if err != nil { return err } for _, chunk := range chunkViews { - fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId) + fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) if err != nil { return err } var writeErr error - _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { - _, writeErr = appendBlobURL.AppendBlock(ctx, bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) + readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) { + _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) }) if readErr != nil { @@ -132,7 +132,7 @@ func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb } -func (g *AzureSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 35c2230fa..df0653f73 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string { return g.dir } -func (g *B2Sink) Initialize(configuration util.Configuration) error { +func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("b2_account_id"), - configuration.GetString("b2_master_application_key"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"b2_account_id"), + configuration.GetString(prefix+"b2_master_application_key"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } @@ -45,8 +45,7 @@ func (g *B2Sink) SetSourceFiler(s *source.FilerSource) { } func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { - ctx := context.Background() - client, err := b2.NewClient(ctx, accountId, accountKey) + client, err := b2.NewClient(context.Background(), accountId, accountKey) if err != nil { return err } @@ -58,7 +57,7 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { return nil } -func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { +func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { key = cleanKey(key) @@ -66,18 +65,18 @@ func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, delet key = key + "/" } - bucket, err := g.client.Bucket(ctx, g.bucket) + bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { return err } targetObject := bucket.Object(key) - return targetObject.Delete(ctx) + return targetObject.Delete(context.Background()) } -func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { +func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { key = cleanKey(key) @@ -88,23 +87,23 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En totalSize := filer2.TotalSize(entry.Chunks) chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - bucket, err := g.client.Bucket(ctx, g.bucket) + bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { return err } targetObject := bucket.Object(key) - writer := targetObject.NewWriter(ctx) + writer := targetObject.NewWriter(context.Background()) for _, chunk := range chunkViews { - fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId) + fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) if err != nil { return err } var writeErr error - _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { + readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) { _, err := writer.Write(data) if err != nil { writeErr = err @@ -124,7 +123,7 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En } -func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 97e9671a3..07218b9b3 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,18 +3,19 @@ package filersink import ( "context" "fmt" - "google.golang.org/grpc" "strings" "sync" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } @@ -23,7 +24,7 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ wg.Add(1) go func(chunk *filer_pb.FileChunk) { defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(ctx, chunk) + replicatedChunk, e := fs.replicateOneChunk(chunk, dir) if e != nil { err = e } @@ -35,9 +36,9 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ return } -func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(ctx, sourceChunk) + fileId, err := fs.fetchAndWrite(sourceChunk, dir) if err != nil { return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) } @@ -49,12 +50,14 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p Mtime: sourceChunk.Mtime, ETag: sourceChunk.ETag, SourceFileId: sourceChunk.GetFileIdString(), + CipherKey: sourceChunk.CipherKey, + IsGzipped: sourceChunk.IsGzipped, }, nil } -func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) { +func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { - filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString()) + filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) if err != nil { return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) } @@ -63,7 +66,7 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi var host string var auth security.EncodedJwt - if err := fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -71,13 +74,17 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi Collection: fs.collection, TtlSec: fs.ttlSec, DataCenter: fs.dataCenter, + ParentPath: dir, } - resp, err := client.AssignVolume(ctx, request) + resp, err := client.AssignVolume(context.Background(), request) if err != nil { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) @@ -90,8 +97,8 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) - uploadResult, err := operation.Upload(fileUrl, filename, readCloser, - "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) + // fetch data as is, regardless whether it is encrypted or not + uploadResult, err := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) return "", fmt.Errorf("upload data: %v", err) @@ -104,9 +111,9 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi return } -func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index f99c7fdf6..838c2c441 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -3,10 +3,11 @@ package filersink import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -38,13 +39,13 @@ func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } -func (fs *FilerSink) Initialize(configuration util.Configuration) error { +func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), - configuration.GetString("replication"), - configuration.GetString("collection"), - configuration.GetInt("ttlSec"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), + configuration.GetString(prefix+"replication"), + configuration.GetString(prefix+"collection"), + configuration.GetInt(prefix+"ttlSec"), ) } @@ -59,12 +60,12 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, fs.replication = replication fs.collection = collection fs.ttlSec = int32(ttlSec) - fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } -func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { - return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { +func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -75,7 +76,7 @@ func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, d } glog.V(1).Infof("delete entry: %v", request) - _, err := client.DeleteEntry(ctx, request) + _, err := client.DeleteEntry(context.Background(), request) if err != nil { glog.V(0).Infof("delete entry %s: %v", key, err) return fmt.Errorf("delete entry %s: %v", key, err) @@ -85,9 +86,9 @@ func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, d }) } -func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { +func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { - return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -97,14 +98,14 @@ func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_p Name: name, } glog.V(1).Infof("lookup: %v", lookupRequest) - if resp, err := client.LookupDirectoryEntry(ctx, lookupRequest); err == nil { + if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil { if filer2.ETag(resp.Entry.Chunks) == filer2.ETag(entry.Chunks) { glog.V(0).Infof("already replicated %s", key) return nil } } - replicatedChunks, err := fs.replicateChunks(ctx, entry.Chunks) + replicatedChunks, err := fs.replicateChunks(entry.Chunks, dir) if err != nil { glog.V(0).Infof("replicate entry chunks %s: %v", key, err) @@ -124,7 +125,7 @@ func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_p } glog.V(1).Infof("create: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { glog.V(0).Infof("create entry %s: %v", key, err) return fmt.Errorf("create entry %s: %v", key, err) } @@ -133,13 +134,13 @@ func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_p }) } -func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { dir, name := filer2.FullPath(key).DirAndName() // read existing entry var existingEntry *filer_pb.Entry - err = fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -147,7 +148,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file } glog.V(4).Infof("lookup entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) + resp, err := filer_pb.LookupEntry(client, request) if err != nil { glog.V(0).Infof("lookup %s: %v", key, err) return err @@ -183,7 +184,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(ctx, newChunks) + replicatedChunks, err := fs.replicateChunks(newChunks, newParentPath) if err != nil { return true, fmt.Errorf("replicte %s chunks error: %v", key, err) } @@ -191,14 +192,14 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file } // save updated meta data - return true, fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: newParentPath, Entry: existingEntry, } - if _, err := client.UpdateEntry(ctx, request); err != nil { + if _, err := client.UpdateEntry(context.Background(), request); err != nil { return fmt.Errorf("update existingEntry %s: %v", key, err) } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index abd7c49b9..694399274 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -6,13 +6,14 @@ import ( "os" "cloud.google.com/go/storage" + "google.golang.org/api/option" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/api/option" ) type GcsSink struct { @@ -34,11 +35,11 @@ func (g *GcsSink) GetSinkToDirectory() string { return g.dir } -func (g *GcsSink) Initialize(configuration util.Configuration) error { +func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } @@ -50,7 +51,6 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str g.bucket = bucketName g.dir = dir - ctx := context.Background() // Creates a client. if google_application_credentials == "" { var found bool @@ -59,7 +59,7 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") } } - client, err := storage.NewClient(ctx, option.WithCredentialsFile(google_application_credentials)) + client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials)) if err != nil { glog.Fatalf("Failed to create client: %v", err) } @@ -69,13 +69,13 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str return nil } -func (g *GcsSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { +func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { if isDirectory { key = key + "/" } - if err := g.client.Bucket(g.bucket).Object(key).Delete(ctx); err != nil { + if err := g.client.Bucket(g.bucket).Object(key).Delete(context.Background()); err != nil { return fmt.Errorf("gcs delete %s%s: %v", g.bucket, key, err) } @@ -83,7 +83,7 @@ func (g *GcsSink) DeleteEntry(ctx context.Context, key string, isDirectory, dele } -func (g *GcsSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { +func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { if entry.IsDirectory { return nil @@ -92,16 +92,16 @@ func (g *GcsSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.E totalSize := filer2.TotalSize(entry.Chunks) chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - wc := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx) + wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) for _, chunk := range chunkViews { - fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId) + fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) if err != nil { return err } - _, err = util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { + err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) { wc.Write(data) }) @@ -119,7 +119,7 @@ func (g *GcsSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.E } -func (g *GcsSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *GcsSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { // TODO improve efficiency return false, nil } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index dd54f0005..6d85f660a 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -1,7 +1,6 @@ package sink import ( - "context" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" @@ -9,10 +8,10 @@ import ( type ReplicationSink interface { GetName() string - Initialize(configuration util.Configuration) error - DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error - CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error - UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) + Initialize(configuration util.Configuration, prefix string) error + DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error + CreateEntry(key string, entry *filer_pb.Entry) error + UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 4cff341d0..5f548559b 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -39,16 +40,16 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { return s3sink.dir } -func (s3sink *S3Sink) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory")) +func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) + glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) return s3sink.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } @@ -77,7 +78,7 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc return nil } -func (s3sink *S3Sink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { +func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { key = cleanKey(key) @@ -89,7 +90,7 @@ func (s3sink *S3Sink) DeleteEntry(ctx context.Context, key string, isDirectory, } -func (s3sink *S3Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { +func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { key = cleanKey(key) @@ -112,7 +113,7 @@ func (s3sink *S3Sink) CreateEntry(ctx context.Context, key string, entry *filer_ wg.Add(1) go func(chunk *filer2.ChunkView) { defer wg.Done() - if part, uploadErr := s3sink.uploadPart(ctx, key, uploadId, partId, chunk); uploadErr != nil { + if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { err = uploadErr } else { parts = append(parts, part) @@ -126,11 +127,11 @@ func (s3sink *S3Sink) CreateEntry(ctx context.Context, key string, entry *filer_ return err } - return s3sink.completeMultipartUpload(ctx, key, uploadId, parts) + return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts) } -func (s3sink *S3Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index 0a190b27d..854688b1e 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -103,10 +103,10 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId } // To upload a part -func (s3sink *S3Sink) uploadPart(ctx context.Context, key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) { +func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) { var readSeeker io.ReadSeeker - readSeeker, err := s3sink.buildReadSeeker(ctx, chunk) + readSeeker, err := s3sink.buildReadSeeker(chunk) if err != nil { glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) @@ -156,12 +156,12 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou return err } -func (s3sink *S3Sink) buildReadSeeker(ctx context.Context, chunk *filer2.ChunkView) (io.ReadSeeker, error) { - fileUrl, err := s3sink.filerSource.LookupFileId(ctx, chunk.FileId) +func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, error) { + fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId) if err != nil { return nil, err } buf := make([]byte, chunk.Size) - util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf, true) + util.ReadUrl(fileUrl, nil, false,false, chunk.Offset, int(chunk.Size), buf) return bytes.NewReader(buf), nil } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index d7b5ebc4d..90bcffdf0 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -3,13 +3,15 @@ package source import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "google.golang.org/grpc" "io" "net/http" "strings" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -25,30 +27,30 @@ type FilerSource struct { Dir string } -func (fs *FilerSource) Initialize(configuration util.Configuration) error { +func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), ) } func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.Dir = dir - fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } -func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl string, err error) { +func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) vid := volumeId(part) - err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read lookup volume id locations: %v", vid) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) if err != nil { @@ -77,9 +79,9 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl s return } -func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { +func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { - fileUrl, err := fs.LookupFileId(ctx, part) + fileUrl, err := fs.LookupFileId(part) if err != nil { return "", nil, nil, err } @@ -89,9 +91,9 @@ func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename stri return filename, header, readCloser, err } -func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index bed26c79c..06869e619 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -27,14 +27,14 @@ func (k *AwsSqsInput) GetName() string { return "aws_sqs" } -func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index eddba9ff8..9726096e5 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -27,8 +27,8 @@ func (k *GoCDKPubSubInput) GetName() string { return "gocdk_pub_sub" } -func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error { - subURL := config.GetString("sub_url") +func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { + subURL := configuration.GetString(prefix + "sub_url") glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) sub, err := pubsub.OpenSubscription(context.Background(), subURL) if err != nil { diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index ad6b42a2e..a950bb42b 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -27,13 +27,13 @@ func (k *GooglePubSubInput) GetName() string { return "google_pub_sub" } -func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index 1a86a8307..fa9cfad9b 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -28,14 +28,14 @@ func (k *KafkaInput) GetName() string { return "kafka" } -func (k *KafkaInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), - configuration.GetString("offsetFile"), - configuration.GetInt("offsetSaveIntervalSeconds"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), + configuration.GetString(prefix+"offsetFile"), + configuration.GetInt(prefix+"offsetSaveIntervalSeconds"), ) } diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go index 66fbef824..8a2668f98 100644 --- a/weed/replication/sub/notifications.go +++ b/weed/replication/sub/notifications.go @@ -9,7 +9,7 @@ type NotificationInput interface { // GetName gets the name to locate the configuration in sync.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) } |
