aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_copy.go
blob: 6320d62fb8d012d902606bcaa6f071f710225be2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
package weed_server

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"net/http"
	"strings"
	"time"

	"golang.org/x/sync/errgroup"
	"google.golang.org/protobuf/proto"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/operation"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
	src := r.URL.Query().Get("cp.from")
	dst := r.URL.Path

	glog.V(2).InfofCtx(ctx, "FilerServer.copy %v to %v", src, dst)

	var err error
	if src, err = clearName(src); err != nil {
		writeJsonError(w, r, http.StatusBadRequest, err)
		return
	}
	if dst, err = clearName(dst); err != nil {
		writeJsonError(w, r, http.StatusBadRequest, err)
		return
	}
	src = strings.TrimRight(src, "/")
	if src == "" {
		err = fmt.Errorf("invalid source '/'")
		writeJsonError(w, r, http.StatusBadRequest, err)
		return
	}

	srcPath := util.FullPath(src)
	dstPath := util.FullPath(dst)
	if dstPath.IsLongerFileName(so.MaxFileNameLength) {
		err = fmt.Errorf("dst name too long")
		writeJsonError(w, r, http.StatusBadRequest, err)
		return
	}

	srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
	if err != nil {
		err = fmt.Errorf("failed to get src entry '%s': %w", src, err)
		writeJsonError(w, r, http.StatusBadRequest, err)
		return
	}

	glog.V(1).InfofCtx(ctx, "FilerServer.copy source entry: content_len=%d, chunks_len=%d", len(srcEntry.Content), len(srcEntry.GetChunks()))

	// Check if source is a directory - currently not supported for recursive copying
	if srcEntry.IsDirectory() {
		err = fmt.Errorf("copy: directory copying not yet supported for '%s'", src)
		writeJsonError(w, r, http.StatusBadRequest, err)
		return
	}

	_, oldName := srcPath.DirAndName()
	finalDstPath := dstPath

	// Check if destination is a directory
	dstPathEntry, findErr := fs.filer.FindEntry(ctx, dstPath)
	if findErr != nil && findErr != filer_pb.ErrNotFound {
		err = fmt.Errorf("failed to check destination path %s: %w", dstPath, findErr)
		writeJsonError(w, r, http.StatusInternalServerError, err)
		return
	}

	if findErr == nil && dstPathEntry.IsDirectory() {
		finalDstPath = dstPath.Child(oldName)
	} else {
		newDir, newName := dstPath.DirAndName()
		newName = util.Nvl(newName, oldName)
		finalDstPath = util.FullPath(newDir).Child(newName)
	}

	// Check if destination file already exists
	// TODO: add an overwrite parameter to allow overwriting
	if dstEntry, err := fs.filer.FindEntry(ctx, finalDstPath); err != nil && err != filer_pb.ErrNotFound {
		err = fmt.Errorf("failed to check destination entry %s: %w", finalDstPath, err)
		writeJsonError(w, r, http.StatusInternalServerError, err)
		return
	} else if dstEntry != nil {
		err = fmt.Errorf("destination file %s already exists", finalDstPath)
		writeJsonError(w, r, http.StatusConflict, err)
		return
	}

	// Copy the file content and chunks
	newEntry, err := fs.copyEntry(ctx, srcEntry, finalDstPath, so)
	if err != nil {
		err = fmt.Errorf("failed to copy entry from '%s' to '%s': %w", src, dst, err)
		writeJsonError(w, r, http.StatusInternalServerError, err)
		return
	}

	if createErr := fs.filer.CreateEntry(ctx, newEntry, true, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
		err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr)
		writeJsonError(w, r, http.StatusInternalServerError, err)
		return
	}

	glog.V(1).InfofCtx(ctx, "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')", src, dst, finalDstPath)

	w.WriteHeader(http.StatusNoContent)
}

// copyEntry creates a new entry with copied content and chunks
func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dstPath util.FullPath, so *operation.StorageOption) (*filer.Entry, error) {
	// Create the base entry structure
	// Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter
	// This creates an independent copy rather than another hard link to the same content
	newEntry := &filer.Entry{
		FullPath: dstPath,
		// Deep copy Attr field to ensure slice independence (GroupNames, Md5)
		Attr: func(a filer.Attr) filer.Attr {
			a.GroupNames = append([]string(nil), a.GroupNames...)
			a.Md5 = append([]byte(nil), a.Md5...)
			return a
		}(srcEntry.Attr),
		Quota: srcEntry.Quota,
		// Intentionally NOT copying HardLinkId and HardLinkCounter to create independent copy
	}

	// Deep copy Extended fields to ensure independence
	if srcEntry.Extended != nil {
		newEntry.Extended = make(map[string][]byte, len(srcEntry.Extended))
		for k, v := range srcEntry.Extended {
			newEntry.Extended[k] = append([]byte(nil), v...)
		}
	}

	// Deep copy Remote field to ensure independence
	if srcEntry.Remote != nil {
		newEntry.Remote = &filer_pb.RemoteEntry{
			StorageName:       srcEntry.Remote.StorageName,
			LastLocalSyncTsNs: srcEntry.Remote.LastLocalSyncTsNs,
			RemoteETag:        srcEntry.Remote.RemoteETag,
			RemoteMtime:       srcEntry.Remote.RemoteMtime,
			RemoteSize:        srcEntry.Remote.RemoteSize,
		}
	}

	// Log if we're copying a hard link so we can track this behavior
	if len(srcEntry.HardLinkId) > 0 {
		glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copying hard link %s (nlink=%d) as independent file", srcEntry.FullPath, srcEntry.HardLinkCounter)
	}

	// Handle small files stored in Content field
	if len(srcEntry.Content) > 0 {
		// For small files, just copy the content directly
		newEntry.Content = make([]byte, len(srcEntry.Content))
		copy(newEntry.Content, srcEntry.Content)
		glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied content directly, size=%d", len(newEntry.Content))
		return newEntry, nil
	}

	// Handle files stored as chunks (including resolved hard link content)
	if len(srcEntry.GetChunks()) > 0 {
		srcChunks := srcEntry.GetChunks()

		// Create HTTP client once for reuse across all chunk operations
		client := &http.Client{Timeout: 60 * time.Second}

		// Check if any chunks are manifest chunks - these require special handling
		if filer.HasChunkManifest(srcChunks) {
			glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: handling manifest chunks")
			newChunks, err := fs.copyChunksWithManifest(ctx, srcChunks, so, client)
			if err != nil {
				return nil, fmt.Errorf("failed to copy chunks with manifest: %w", err)
			}
			newEntry.Chunks = newChunks
			glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied manifest chunks, count=%d", len(newChunks))
		} else {
			// Regular chunks without manifest - copy directly
			newChunks, err := fs.copyChunks(ctx, srcChunks, so, client)
			if err != nil {
				return nil, fmt.Errorf("failed to copy chunks: %w", err)
			}
			newEntry.Chunks = newChunks
			glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied regular chunks, count=%d", len(newChunks))
		}
		return newEntry, nil
	}

	// Empty file case (or hard link with no content - should not happen if hard link was properly resolved)
	if len(srcEntry.HardLinkId) > 0 {
		glog.WarningfCtx(ctx, "FilerServer.copyEntry: hard link %s appears to have no content - this may indicate an issue with hard link resolution", srcEntry.FullPath)
	}
	glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: empty file, no content or chunks to copy")
	return newEntry, nil
}

// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
	if len(srcChunks) == 0 {
		return nil, nil
	}

	// Optimize: Batch volume lookup for all chunks to reduce RPC calls
	volumeLocationsMap, err := fs.batchLookupVolumeLocations(ctx, srcChunks)
	if err != nil {
		return nil, fmt.Errorf("failed to lookup volume locations: %w", err)
	}

	// Parallel chunk copying with concurrency control using errgroup
	const maxConcurrentChunks = 8 // Match SeaweedFS standard for parallel operations

	// Pre-allocate result slice to maintain order
	newChunks := make([]*filer_pb.FileChunk, len(srcChunks))

	// Use errgroup for cleaner concurrency management
	g, gCtx := errgroup.WithContext(ctx)
	g.SetLimit(maxConcurrentChunks) // Limit concurrent goroutines

	// Validate that all chunk locations are available before starting any concurrent work
	for _, chunk := range srcChunks {
		volumeId := chunk.Fid.VolumeId
		locations, ok := volumeLocationsMap[volumeId]
		if !ok || len(locations) == 0 {
			return nil, fmt.Errorf("no locations found for volume %d", volumeId)
		}
	}

	glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: starting parallel copy of %d chunks with max concurrency %d", len(srcChunks), maxConcurrentChunks)

	// Launch goroutines for each chunk
	for i, srcChunk := range srcChunks {
		// Capture loop variables for goroutine closure
		chunkIndex := i
		chunk := srcChunk
		chunkLocations := volumeLocationsMap[srcChunk.Fid.VolumeId]

		g.Go(func() error {
			glog.V(3).InfofCtx(gCtx, "FilerServer.copyChunks: copying chunk %d/%d, size=%d", chunkIndex+1, len(srcChunks), chunk.Size)

			// Use streaming copy to avoid loading entire chunk into memory
			newChunk, err := fs.streamCopyChunk(gCtx, chunk, so, client, chunkLocations)
			if err != nil {
				return fmt.Errorf("failed to copy chunk %d (%s): %w", chunkIndex+1, chunk.GetFileIdString(), err)
			}

			// Store result at correct index to maintain order
			newChunks[chunkIndex] = newChunk

			glog.V(4).InfofCtx(gCtx, "FilerServer.copyChunks: successfully copied chunk %d/%d", chunkIndex+1, len(srcChunks))
			return nil
		})
	}

	// Wait for all chunks to complete and return first error (if any)
	if err := g.Wait(); err != nil {
		return nil, err
	}

	// Verify all chunks were copied (shouldn't happen if no errors, but safety check)
	for i, chunk := range newChunks {
		if chunk == nil {
			return nil, fmt.Errorf("chunk %d was not copied (internal error)", i)
		}
	}

	glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: successfully completed parallel copy of %d chunks", len(srcChunks))
	return newChunks, nil
}

// copyChunksWithManifest handles copying chunks that include manifest chunks
func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
	if len(srcChunks) == 0 {
		return nil, nil
	}

	glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing %d chunks (some are manifests)", len(srcChunks))

	// Separate manifest chunks from regular data chunks
	manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(srcChunks)

	var newChunks []*filer_pb.FileChunk

	// First, copy all non-manifest chunks directly
	if len(nonManifestChunks) > 0 {
		glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks", len(nonManifestChunks))
		newNonManifestChunks, err := fs.copyChunks(ctx, nonManifestChunks, so, client)
		if err != nil {
			return nil, fmt.Errorf("failed to copy non-manifest chunks: %w", err)
		}
		newChunks = append(newChunks, newNonManifestChunks...)
	}

	// Process each manifest chunk separately
	for i, manifestChunk := range manifestChunks {
		glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing manifest chunk %d/%d", i+1, len(manifestChunks))

		// Resolve the manifest chunk to get the actual data chunks it references
		lookupFileIdFn := func(ctx context.Context, fileId string) (urls []string, err error) {
			return fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
		}

		resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, lookupFileIdFn, manifestChunk)
		if err != nil {
			return nil, fmt.Errorf("failed to resolve manifest chunk %s: %w", manifestChunk.GetFileIdString(), err)
		}

		glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: resolved manifest chunk %s to %d data chunks",
			manifestChunk.GetFileIdString(), len(resolvedChunks))

		// Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests)
		newResolvedChunks, err := fs.copyChunksWithManifest(ctx, resolvedChunks, so, client)
		if err != nil {
			return nil, fmt.Errorf("failed to copy resolved chunks from manifest %s: %w", manifestChunk.GetFileIdString(), err)
		}

		// Create a new manifest chunk that references the copied data chunks
		newManifestChunk, err := fs.createManifestChunk(ctx, newResolvedChunks, manifestChunk, so, client)
		if err != nil {
			return nil, fmt.Errorf("failed to create new manifest chunk: %w", err)
		}

		newChunks = append(newChunks, newManifestChunk)

		glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: created new manifest chunk %s for %d resolved chunks",
			newManifestChunk.GetFileIdString(), len(newResolvedChunks))
	}

	glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: completed copying %d total chunks (%d manifest, %d regular)",
		len(newChunks), len(manifestChunks), len(nonManifestChunks))

	return newChunks, nil
}

// createManifestChunk creates a new manifest chunk that references the provided data chunks
func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*filer_pb.FileChunk, originalManifest *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) (*filer_pb.FileChunk, error) {
	// Create the manifest data structure
	filer_pb.BeforeEntrySerialization(dataChunks)

	manifestData := &filer_pb.FileChunkManifest{
		Chunks: dataChunks,
	}

	// Serialize the manifest
	data, err := proto.Marshal(manifestData)
	if err != nil {
		return nil, fmt.Errorf("failed to marshal manifest: %w", err)
	}

	// Save the manifest data as a new chunk
	saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
		// Assign a new file ID
		fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
		if assignErr != nil {
			return nil, fmt.Errorf("failed to assign file ID for manifest: %w", assignErr)
		}

		// Upload the manifest data
		err = fs.uploadData(ctx, reader, urlLocation, string(auth), client)
		if err != nil {
			return nil, fmt.Errorf("failed to upload manifest data: %w", err)
		}

		// Create the chunk metadata
		chunk = &filer_pb.FileChunk{
			FileId: fileId,
			Offset: offset,
			Size:   uint64(len(data)),
		}
		return chunk, nil
	}

	manifestChunk, err := saveFunc(bytes.NewReader(data), "", originalManifest.Offset, 0)
	if err != nil {
		return nil, fmt.Errorf("failed to save manifest chunk: %w", err)
	}

	// Set manifest-specific properties
	manifestChunk.IsChunkManifest = true
	manifestChunk.Size = originalManifest.Size

	return manifestChunk, nil
}

// uploadData uploads data to a volume server
func (fs *FilerServer) uploadData(ctx context.Context, reader io.Reader, urlLocation, auth string, client *http.Client) error {
	req, err := http.NewRequestWithContext(ctx, "PUT", urlLocation, reader)
	if err != nil {
		return fmt.Errorf("failed to create upload request: %w", err)
	}

	if auth != "" {
		req.Header.Set("Authorization", "Bearer "+auth)
	}

	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("failed to upload data: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
		body, readErr := io.ReadAll(resp.Body)
		if readErr != nil {
			return fmt.Errorf("upload failed with status %d, and failed to read response: %w", resp.StatusCode, readErr)
		}
		return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
	}

	return nil
}

// batchLookupVolumeLocations performs a single batched lookup for all unique volume IDs in the chunks
func (fs *FilerServer) batchLookupVolumeLocations(ctx context.Context, chunks []*filer_pb.FileChunk) (map[uint32][]operation.Location, error) {
	// Collect unique volume IDs and their string representations to avoid repeated conversions
	volumeIdMap := make(map[uint32]string)
	for _, chunk := range chunks {
		vid := chunk.Fid.VolumeId
		if _, found := volumeIdMap[vid]; !found {
			volumeIdMap[vid] = fmt.Sprintf("%d", vid)
		}
	}

	if len(volumeIdMap) == 0 {
		return make(map[uint32][]operation.Location), nil
	}

	// Convert to slice of strings for the lookup call
	volumeIdStrs := make([]string, 0, len(volumeIdMap))
	for _, vidStr := range volumeIdMap {
		volumeIdStrs = append(volumeIdStrs, vidStr)
	}

	// Perform single batched lookup
	lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster, fs.grpcDialOption, volumeIdStrs)
	if err != nil {
		return nil, fmt.Errorf("failed to lookup volumes: %w", err)
	}

	// Convert result to map of volumeId -> locations
	volumeLocationsMap := make(map[uint32][]operation.Location)
	for volumeId, volumeIdStr := range volumeIdMap {
		if volumeLocations, ok := lookupResult[volumeIdStr]; ok && len(volumeLocations.Locations) > 0 {
			volumeLocationsMap[volumeId] = volumeLocations.Locations
		}
	}

	return volumeLocationsMap, nil
}

// streamCopyChunk copies a chunk using streaming to minimize memory usage
func (fs *FilerServer) streamCopyChunk(ctx context.Context, srcChunk *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client, locations []operation.Location) (*filer_pb.FileChunk, error) {
	// Assign a new file ID for destination
	fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
	if err != nil {
		return nil, fmt.Errorf("failed to assign new file ID: %w", err)
	}

	// Try all available locations for source chunk until one succeeds
	fileIdString := srcChunk.GetFileIdString()
	var lastErr error

	for i, location := range locations {
		srcUrl := fmt.Sprintf("http://%s/%s", location.Url, fileIdString)
		glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: attempting streaming copy from %s to %s (attempt %d/%d)", srcUrl, urlLocation, i+1, len(locations))

		// Perform streaming copy using HTTP client
		err := fs.performStreamCopy(ctx, srcUrl, urlLocation, string(auth), srcChunk.Size, client)
		if err != nil {
			lastErr = err
			glog.V(2).InfofCtx(ctx, "FilerServer.streamCopyChunk: failed streaming copy from %s: %v", srcUrl, err)
			continue
		}

		// Success - create chunk metadata
		newChunk := &filer_pb.FileChunk{
			FileId: fileId,
			Offset: srcChunk.Offset,
			Size:   srcChunk.Size,
			ETag:   srcChunk.ETag,
		}

		glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: successfully streamed %d bytes", srcChunk.Size)
		return newChunk, nil
	}

	// All locations failed
	return nil, fmt.Errorf("failed to stream copy chunk from any location: %w", lastErr)
}

// performStreamCopy performs the actual streaming copy from source URL to destination URL
func (fs *FilerServer) performStreamCopy(ctx context.Context, srcUrl, dstUrl, auth string, expectedSize uint64, client *http.Client) error {
	// Create HTTP request to read from source
	req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil)
	if err != nil {
		return fmt.Errorf("failed to create source request: %v", err)
	}

	// Perform source request
	resp, err := client.Do(req)
	if err != nil {
		return fmt.Errorf("failed to read from source: %v", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("source returned status %d", resp.StatusCode)
	}

	// Create HTTP request to write to destination
	dstReq, err := http.NewRequestWithContext(ctx, "PUT", dstUrl, resp.Body)
	if err != nil {
		return fmt.Errorf("failed to create destination request: %v", err)
	}
	dstReq.ContentLength = int64(expectedSize)

	// Set authorization header if provided
	if auth != "" {
		dstReq.Header.Set("Authorization", "Bearer "+auth)
	}
	dstReq.Header.Set("Content-Type", "application/octet-stream")

	// Perform destination request
	dstResp, err := client.Do(dstReq)
	if err != nil {
		return fmt.Errorf("failed to write to destination: %v", err)
	}
	defer dstResp.Body.Close()

	if dstResp.StatusCode != http.StatusCreated && dstResp.StatusCode != http.StatusOK {
		// Read error response body for more details
		body, readErr := io.ReadAll(dstResp.Body)
		if readErr != nil {
			return fmt.Errorf("destination returned status %d, and failed to read body: %w", dstResp.StatusCode, readErr)
		}
		return fmt.Errorf("destination returned status %d: %s", dstResp.StatusCode, string(body))
	}

	glog.V(4).InfofCtx(ctx, "FilerServer.performStreamCopy: successfully streamed data from %s to %s", srcUrl, dstUrl)
	return nil
}