diff options
Diffstat (limited to 'weed/replication/sink/azuresink/azure_sink.go')
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 154 |
1 files changed, 114 insertions, 40 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index b0e40e1a7..8eb2218e7 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -3,14 +3,9 @@ package azuresink import ( "bytes" "context" - "errors" "fmt" - "net/http" "strings" - "time" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" @@ -20,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage/azure" "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -75,22 +71,25 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e } serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName) - client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ - ClientOptions: azcore.ClientOptions{ - Retry: policy.RetryOptions{ - MaxRetries: 10, // Increased from default 3 for replication sink resiliency - TryTimeout: time.Minute, - RetryDelay: 2 * time.Second, - MaxRetryDelay: time.Minute, - }, - }, - }) + client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, azure.DefaultAzBlobClientOptions()) if err != nil { return fmt.Errorf("failed to create Azure client: %w", err) } g.client = client + // Validate that the container exists early to catch configuration errors + containerClient := client.ServiceClient().NewContainerClient(container) + ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelValidate() + _, err = containerClient.GetProperties(ctxValidate, nil) + if err != nil { + if bloberror.HasCode(err, bloberror.ContainerNotFound) { + return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container) + } + return fmt.Errorf("failed to validate Azure container '%s': %w", container, err) + } + return nil } @@ -103,7 +102,9 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo } blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key) - _, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{ + ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelDelete() + _, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), }) if err != nil { @@ -131,35 +132,34 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] // Create append blob client appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) - // Create blob with access conditions - accessConditions := &blob.AccessConditions{} - if entry.Attributes != nil && entry.Attributes.Mtime > 0 { - modifiedTime := time.Unix(entry.Attributes.Mtime, 0) - accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{ - IfUnmodifiedSince: &modifiedTime, - } - } - - _, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{ - AccessConditions: accessConditions, - }) + // Try to create the blob first (without access conditions for initial creation) + ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelCreate() + _, err := appendBlobClient.Create(ctxCreate, nil) + needsWrite := true if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { - // Blob already exists, which is fine for an append blob - we can append to it - } else { - // Check if this is a precondition failed error (HTTP 412) - var respErr *azcore.ResponseError - if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed { - glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key) - return nil + // Handle existing blob - check if overwrite is needed and perform it if necessary + var handleErr error + needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry, totalSize) + if handleErr != nil { + return handleErr } + } else { return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) } } + // If we don't need to write (blob is up-to-date), return early + if !needsWrite { + return nil + } + writeFunc := func(data []byte) error { - _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) + ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelWrite() + _, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) return writeErr } @@ -174,14 +174,88 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return nil } +// handleExistingBlob determines whether an existing blob needs to be overwritten and performs the overwrite if necessary. +// It returns: +// - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date +// - error: any error encountered during the operation +func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) { + // Get the blob's properties to decide whether to overwrite. + // Use a timeout to fail fast on network issues. + ctxProps, cancelProps := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelProps() + props, propErr := appendBlobClient.GetProperties(ctxProps, nil) + + // Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state. + if propErr != nil { + return false, fmt.Errorf("azure get properties %s/%s: %w", g.container, key, propErr) + } + + // Check if we can skip writing based on modification time and size. + if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil { + const clockSkewTolerance = int64(2) // seconds - allow small clock differences + remoteMtime := props.LastModified.Unix() + localMtime := entry.Attributes.Mtime + // Skip if remote is newer/same (within skew tolerance) and has the SAME size. + // This prevents skipping partial/corrupted files that may have a newer mtime. + if remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength == int64(totalSize) { + glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", + g.container, key, remoteMtime, localMtime, *props.ContentLength) + return false, nil + } + } + + // Blob is empty or outdated - we need to delete and recreate it. + // REQUIRE ETag for conditional delete to avoid race conditions and data loss. + if props.ETag == nil { + return false, fmt.Errorf("azure blob %s/%s: missing ETag; refusing to delete without conditional", g.container, key) + } + + deleteOpts := &blob.DeleteOptions{ + DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), + AccessConditions: &blob.AccessConditions{ + ModifiedAccessConditions: &blob.ModifiedAccessConditions{ + IfMatch: props.ETag, + }, + }, + } + + // Delete existing blob with conditional delete and timeout. + ctxDel, cancelDel := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelDel() + _, delErr := appendBlobClient.Delete(ctxDel, deleteOpts) + + if delErr != nil { + // If the precondition fails, the blob was modified by another process after we checked it. + // Failing here is safe; replication will retry. + if bloberror.HasCode(delErr, bloberror.ConditionNotMet) { + return false, fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr) + } + // Ignore BlobNotFound, as the goal is to delete it anyway. + if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { + return false, fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) + } + } + + // Recreate the blob with timeout. + ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelRecreate() + _, createErr := appendBlobClient.Create(ctxRecreate, nil) + + if createErr != nil { + // It's possible another process recreated it after our delete. + // Failing is safe, as a retry of the whole function will handle it. + return false, fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr) + } + + return true, nil +} + func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { key = cleanKey(key) return true, g.CreateEntry(key, newEntry, signatures) } func cleanKey(key string) string { - if strings.HasPrefix(key, "/") { - key = key[1:] - } - return key + // Remove all leading slashes (TrimLeft handles multiple slashes, unlike TrimPrefix) + return strings.TrimLeft(key, "/") } |
