aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sink/azuresink/azure_sink.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication/sink/azuresink/azure_sink.go')
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go26
1 files changed, 13 insertions, 13 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index 6381908a1..89e04922f 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
-func (g *AzureSink) Initialize(configuration util.Configuration) error {
+func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
return g.initialize(
- configuration.GetString("account_name"),
- configuration.GetString("account_key"),
- configuration.GetString("container"),
- configuration.GetString("directory"),
+ configuration.GetString(prefix+"account_name"),
+ configuration.GetString(prefix+"account_key"),
+ configuration.GetString(prefix+"container"),
+ configuration.GetString(prefix+"directory"),
)
}
@@ -70,7 +70,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e
return nil
}
-func (g *AzureSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error {
+func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
key = cleanKey(key)
@@ -78,7 +78,7 @@ func (g *AzureSink) DeleteEntry(ctx context.Context, key string, isDirectory, de
key = key + "/"
}
- if _, err := g.containerURL.NewBlobURL(key).Delete(ctx,
+ if _, err := g.containerURL.NewBlobURL(key).Delete(context.Background(),
azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err)
}
@@ -87,7 +87,7 @@ func (g *AzureSink) DeleteEntry(ctx context.Context, key string, isDirectory, de
}
-func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error {
+func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
key = cleanKey(key)
@@ -102,21 +102,21 @@ func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb
// Azure Storage account's container.
appendBlobURL := g.containerURL.NewAppendBlobURL(key)
- _, err := appendBlobURL.Create(ctx, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
+ _, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
if err != nil {
return err
}
for _, chunk := range chunkViews {
- fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId)
+ fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return err
}
var writeErr error
- _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
- _, writeErr = appendBlobURL.AppendBlock(ctx, bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
+ readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk, chunk.Offset, int(chunk.Size), func(data []byte) {
+ _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
})
if readErr != nil {
@@ -132,7 +132,7 @@ func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb
}
-func (g *AzureSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
key = cleanKey(key)
// TODO improve efficiency
return false, nil