aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sink/b2sink/b2_sink.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication/sink/b2sink/b2_sink.go')
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go31
1 files changed, 15 insertions, 16 deletions
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index 35c2230fa..df0653f73 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir
}
-func (g *B2Sink) Initialize(configuration util.Configuration) error {
+func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("b2_account_id"),
- configuration.GetString("b2_master_application_key"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"b2_account_id"),
+ configuration.GetString(prefix+"b2_master_application_key"),
+ configuration.GetString(prefix+"bucket"),
+ configuration.GetString(prefix+"directory"),
)
}
@@ -45,8 +45,7 @@ func (g *B2Sink) SetSourceFiler(s *source.FilerSource) {
}
func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error {
- ctx := context.Background()
- client, err := b2.NewClient(ctx, accountId, accountKey)
+ client, err := b2.NewClient(context.Background(), accountId, accountKey)
if err != nil {
return err
}
@@ -58,7 +57,7 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error {
return nil
}
-func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error {
+func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
key = cleanKey(key)
@@ -66,18 +65,18 @@ func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, delet
key = key + "/"
}
- bucket, err := g.client.Bucket(ctx, g.bucket)
+ bucket, err := g.client.Bucket(context.Background(), g.bucket)
if err != nil {
return err
}
targetObject := bucket.Object(key)
- return targetObject.Delete(ctx)
+ return targetObject.Delete(context.Background())
}
-func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error {
+func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
key = cleanKey(key)
@@ -88,23 +87,23 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En
totalSize := filer2.TotalSize(entry.Chunks)
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
- bucket, err := g.client.Bucket(ctx, g.bucket)
+ bucket, err := g.client.Bucket(context.Background(), g.bucket)
if err != nil {
return err
}
targetObject := bucket.Object(key)
- writer := targetObject.NewWriter(ctx)
+ writer := targetObject.NewWriter(context.Background())
for _, chunk := range chunkViews {
- fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId)
+ fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return err
}
var writeErr error
- _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
+ readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
_, err := writer.Write(data)
if err != nil {
writeErr = err
@@ -124,7 +123,7 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En
}
-func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
key = cleanKey(key)