aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sink/azuresink/azure_sink.go
blob: 8eb2218e73af470b00db80cf7336d6952651c8d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package azuresink

import (
	"bytes"
	"context"
	"fmt"
	"strings"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/remote_storage/azure"
	"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
	"github.com/seaweedfs/seaweedfs/weed/replication/sink"
	"github.com/seaweedfs/seaweedfs/weed/replication/source"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

type AzureSink struct {
	client        *azblob.Client
	container     string
	dir           string
	filerSource   *source.FilerSource
	isIncremental bool
}

func init() {
	sink.Sinks = append(sink.Sinks, &AzureSink{})
}

func (g *AzureSink) GetName() string {
	return "azure"
}

func (g *AzureSink) GetSinkToDirectory() string {
	return g.dir
}

func (g *AzureSink) IsIncremental() bool {
	return g.isIncremental
}

func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
	g.isIncremental = configuration.GetBool(prefix + "is_incremental")
	return g.initialize(
		configuration.GetString(prefix+"account_name"),
		configuration.GetString(prefix+"account_key"),
		configuration.GetString(prefix+"container"),
		configuration.GetString(prefix+"directory"),
	)
}

func (g *AzureSink) SetSourceFiler(s *source.FilerSource) {
	g.filerSource = s
}

func (g *AzureSink) initialize(accountName, accountKey, container, dir string) error {
	g.container = container
	g.dir = dir

	// Create credential and client
	credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
	if err != nil {
		return fmt.Errorf("failed to create Azure credential with account name:%s: %w", accountName, err)
	}

	serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
	client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, azure.DefaultAzBlobClientOptions())
	if err != nil {
		return fmt.Errorf("failed to create Azure client: %w", err)
	}

	g.client = client

	// Validate that the container exists early to catch configuration errors
	containerClient := client.ServiceClient().NewContainerClient(container)
	ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
	defer cancelValidate()
	_, err = containerClient.GetProperties(ctxValidate, nil)
	if err != nil {
		if bloberror.HasCode(err, bloberror.ContainerNotFound) {
			return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container)
		}
		return fmt.Errorf("failed to validate Azure container '%s': %w", container, err)
	}

	return nil
}

func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {

	key = cleanKey(key)

	if isDirectory {
		key = key + "/"
	}

	blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key)
	ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
	defer cancelDelete()
	_, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{
		DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
	})
	if err != nil {
		// Make delete idempotent - don't return error if blob doesn't exist
		if bloberror.HasCode(err, bloberror.BlobNotFound) {
			return nil
		}
		return fmt.Errorf("azure delete %s/%s: %w", g.container, key, err)
	}

	return nil
}

func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {

	key = cleanKey(key)

	if entry.IsDirectory {
		return nil
	}

	totalSize := filer.FileSize(entry)
	chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))

	// Create append blob client
	appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)

	// Try to create the blob first (without access conditions for initial creation)
	ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
	defer cancelCreate()
	_, err := appendBlobClient.Create(ctxCreate, nil)

	needsWrite := true
	if err != nil {
		if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
			// Handle existing blob - check if overwrite is needed and perform it if necessary
			var handleErr error
			needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry, totalSize)
			if handleErr != nil {
				return handleErr
			}
		} else {
			return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
		}
	}

	// If we don't need to write (blob is up-to-date), return early
	if !needsWrite {
		return nil
	}

	writeFunc := func(data []byte) error {
		ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
		defer cancelWrite()
		_, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
		return writeErr
	}

	if len(entry.Content) > 0 {
		return writeFunc(entry.Content)
	}

	if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
		return err
	}

	return nil
}

// handleExistingBlob determines whether an existing blob needs to be overwritten and performs the overwrite if necessary.
// It returns:
//   - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date
//   - error: any error encountered during the operation
func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) {
	// Get the blob's properties to decide whether to overwrite.
	// Use a timeout to fail fast on network issues.
	ctxProps, cancelProps := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
	defer cancelProps()
	props, propErr := appendBlobClient.GetProperties(ctxProps, nil)

	// Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state.
	if propErr != nil {
		return false, fmt.Errorf("azure get properties %s/%s: %w", g.container, key, propErr)
	}

	// Check if we can skip writing based on modification time and size.
	if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil {
		const clockSkewTolerance = int64(2) // seconds - allow small clock differences
		remoteMtime := props.LastModified.Unix()
		localMtime := entry.Attributes.Mtime
		// Skip if remote is newer/same (within skew tolerance) and has the SAME size.
		// This prevents skipping partial/corrupted files that may have a newer mtime.
		if remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength == int64(totalSize) {
			glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)",
				g.container, key, remoteMtime, localMtime, *props.ContentLength)
			return false, nil
		}
	}

	// Blob is empty or outdated - we need to delete and recreate it.
	// REQUIRE ETag for conditional delete to avoid race conditions and data loss.
	if props.ETag == nil {
		return false, fmt.Errorf("azure blob %s/%s: missing ETag; refusing to delete without conditional", g.container, key)
	}

	deleteOpts := &blob.DeleteOptions{
		DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
		AccessConditions: &blob.AccessConditions{
			ModifiedAccessConditions: &blob.ModifiedAccessConditions{
				IfMatch: props.ETag,
			},
		},
	}

	// Delete existing blob with conditional delete and timeout.
	ctxDel, cancelDel := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
	defer cancelDel()
	_, delErr := appendBlobClient.Delete(ctxDel, deleteOpts)

	if delErr != nil {
		// If the precondition fails, the blob was modified by another process after we checked it.
		// Failing here is safe; replication will retry.
		if bloberror.HasCode(delErr, bloberror.ConditionNotMet) {
			return false, fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr)
		}
		// Ignore BlobNotFound, as the goal is to delete it anyway.
		if !bloberror.HasCode(delErr, bloberror.BlobNotFound) {
			return false, fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr)
		}
	}

	// Recreate the blob with timeout.
	ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
	defer cancelRecreate()
	_, createErr := appendBlobClient.Create(ctxRecreate, nil)

	if createErr != nil {
		// It's possible another process recreated it after our delete.
		// Failing is safe, as a retry of the whole function will handle it.
		return false, fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr)
	}

	return true, nil
}

func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
	key = cleanKey(key)
	return true, g.CreateEntry(key, newEntry, signatures)
}

func cleanKey(key string) string {
	// Remove all leading slashes (TrimLeft handles multiple slashes, unlike TrimPrefix)
	return strings.TrimLeft(key, "/")
}