diff options
Diffstat (limited to 'weed/replication/sink/b2sink/b2_sink.go')
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 31 |
1 files changed, 15 insertions, 16 deletions
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 35c2230fa..df0653f73 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string { return g.dir } -func (g *B2Sink) Initialize(configuration util.Configuration) error { +func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("b2_account_id"), - configuration.GetString("b2_master_application_key"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"b2_account_id"), + configuration.GetString(prefix+"b2_master_application_key"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } @@ -45,8 +45,7 @@ func (g *B2Sink) SetSourceFiler(s *source.FilerSource) { } func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { - ctx := context.Background() - client, err := b2.NewClient(ctx, accountId, accountKey) + client, err := b2.NewClient(context.Background(), accountId, accountKey) if err != nil { return err } @@ -58,7 +57,7 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { return nil } -func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { +func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { key = cleanKey(key) @@ -66,18 +65,18 @@ func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, delet key = key + "/" } - bucket, err := g.client.Bucket(ctx, g.bucket) + bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { return err } targetObject := bucket.Object(key) - return targetObject.Delete(ctx) + return targetObject.Delete(context.Background()) } -func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { +func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { key = cleanKey(key) @@ -88,23 +87,23 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En totalSize := filer2.TotalSize(entry.Chunks) chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - bucket, err := g.client.Bucket(ctx, g.bucket) + bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { return err } targetObject := bucket.Object(key) - writer := targetObject.NewWriter(ctx) + writer := targetObject.NewWriter(context.Background()) for _, chunk := range chunkViews { - fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId) + fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) if err != nil { return err } var writeErr error - _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { + readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) { _, err := writer.Write(data) if err != nil { writeErr = err @@ -124,7 +123,7 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En } -func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) |
