aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sink/azuresink/azure_sink.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication/sink/azuresink/azure_sink.go')
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go154
1 files changed, 114 insertions, 40 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index b0e40e1a7..8eb2218e7 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -3,14 +3,9 @@ package azuresink
import (
"bytes"
"context"
- "errors"
"fmt"
- "net/http"
"strings"
- "time"
- "github.com/Azure/azure-sdk-for-go/sdk/azcore"
- "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
@@ -20,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/remote_storage/azure"
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
@@ -75,22 +71,25 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e
}
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
- client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{
- ClientOptions: azcore.ClientOptions{
- Retry: policy.RetryOptions{
- MaxRetries: 10, // Increased from default 3 for replication sink resiliency
- TryTimeout: time.Minute,
- RetryDelay: 2 * time.Second,
- MaxRetryDelay: time.Minute,
- },
- },
- })
+ client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, azure.DefaultAzBlobClientOptions())
if err != nil {
return fmt.Errorf("failed to create Azure client: %w", err)
}
g.client = client
+ // Validate that the container exists early to catch configuration errors
+ containerClient := client.ServiceClient().NewContainerClient(container)
+ ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
+ defer cancelValidate()
+ _, err = containerClient.GetProperties(ctxValidate, nil)
+ if err != nil {
+ if bloberror.HasCode(err, bloberror.ContainerNotFound) {
+ return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container)
+ }
+ return fmt.Errorf("failed to validate Azure container '%s': %w", container, err)
+ }
+
return nil
}
@@ -103,7 +102,9 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo
}
blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key)
- _, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{
+ ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
+ defer cancelDelete()
+ _, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{
DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
})
if err != nil {
@@ -131,35 +132,34 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
// Create append blob client
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)
- // Create blob with access conditions
- accessConditions := &blob.AccessConditions{}
- if entry.Attributes != nil && entry.Attributes.Mtime > 0 {
- modifiedTime := time.Unix(entry.Attributes.Mtime, 0)
- accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{
- IfUnmodifiedSince: &modifiedTime,
- }
- }
-
- _, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{
- AccessConditions: accessConditions,
- })
+ // Try to create the blob first (without access conditions for initial creation)
+ ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
+ defer cancelCreate()
+ _, err := appendBlobClient.Create(ctxCreate, nil)
+ needsWrite := true
if err != nil {
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
- // Blob already exists, which is fine for an append blob - we can append to it
- } else {
- // Check if this is a precondition failed error (HTTP 412)
- var respErr *azcore.ResponseError
- if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed {
- glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key)
- return nil
+ // Handle existing blob - check if overwrite is needed and perform it if necessary
+ var handleErr error
+ needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry, totalSize)
+ if handleErr != nil {
+ return handleErr
}
+ } else {
return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
}
}
+ // If we don't need to write (blob is up-to-date), return early
+ if !needsWrite {
+ return nil
+ }
+
writeFunc := func(data []byte) error {
- _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
+ ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
+ defer cancelWrite()
+ _, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
return writeErr
}
@@ -174,14 +174,88 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
return nil
}
+// handleExistingBlob determines whether an existing blob needs to be overwritten and performs the overwrite if necessary.
+// It returns:
+// - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date
+// - error: any error encountered during the operation
+func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) {
+ // Get the blob's properties to decide whether to overwrite.
+ // Use a timeout to fail fast on network issues.
+ ctxProps, cancelProps := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
+ defer cancelProps()
+ props, propErr := appendBlobClient.GetProperties(ctxProps, nil)
+
+ // Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state.
+ if propErr != nil {
+ return false, fmt.Errorf("azure get properties %s/%s: %w", g.container, key, propErr)
+ }
+
+ // Check if we can skip writing based on modification time and size.
+ if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil {
+ const clockSkewTolerance = int64(2) // seconds - allow small clock differences
+ remoteMtime := props.LastModified.Unix()
+ localMtime := entry.Attributes.Mtime
+ // Skip if remote is newer/same (within skew tolerance) and has the SAME size.
+ // This prevents skipping partial/corrupted files that may have a newer mtime.
+ if remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength == int64(totalSize) {
+ glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)",
+ g.container, key, remoteMtime, localMtime, *props.ContentLength)
+ return false, nil
+ }
+ }
+
+ // Blob is empty or outdated - we need to delete and recreate it.
+ // REQUIRE ETag for conditional delete to avoid race conditions and data loss.
+ if props.ETag == nil {
+ return false, fmt.Errorf("azure blob %s/%s: missing ETag; refusing to delete without conditional", g.container, key)
+ }
+
+ deleteOpts := &blob.DeleteOptions{
+ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
+ AccessConditions: &blob.AccessConditions{
+ ModifiedAccessConditions: &blob.ModifiedAccessConditions{
+ IfMatch: props.ETag,
+ },
+ },
+ }
+
+ // Delete existing blob with conditional delete and timeout.
+ ctxDel, cancelDel := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
+ defer cancelDel()
+ _, delErr := appendBlobClient.Delete(ctxDel, deleteOpts)
+
+ if delErr != nil {
+ // If the precondition fails, the blob was modified by another process after we checked it.
+ // Failing here is safe; replication will retry.
+ if bloberror.HasCode(delErr, bloberror.ConditionNotMet) {
+ return false, fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr)
+ }
+ // Ignore BlobNotFound, as the goal is to delete it anyway.
+ if !bloberror.HasCode(delErr, bloberror.BlobNotFound) {
+ return false, fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr)
+ }
+ }
+
+ // Recreate the blob with timeout.
+ ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
+ defer cancelRecreate()
+ _, createErr := appendBlobClient.Create(ctxRecreate, nil)
+
+ if createErr != nil {
+ // It's possible another process recreated it after our delete.
+ // Failing is safe, as a retry of the whole function will handle it.
+ return false, fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr)
+ }
+
+ return true, nil
+}
+
func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
key = cleanKey(key)
return true, g.CreateEntry(key, newEntry, signatures)
}
func cleanKey(key string) string {
- if strings.HasPrefix(key, "/") {
- key = key[1:]
- }
- return key
+ // Remove all leading slashes (TrimLeft handles multiple slashes, unlike TrimPrefix)
+ return strings.TrimLeft(key, "/")
}