aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-12-10 21:49:07 -0800
committerchrislu <chris.lu@gmail.com>2022-12-10 21:49:07 -0800
commite1ca6308cb66e25692d8b874715f61c940fa5b91 (patch)
treea518fba58cc95faa7bf5ca757780b18c3548d8e8 /weed/server
parent659f28b8552e2635b2bc4566e525f1fe098204da (diff)
downloadseaweedfs-e1ca6308cb66e25692d8b874715f61c940fa5b91.tar.xz
seaweedfs-e1ca6308cb66e25692d8b874715f61c940fa5b91.zip
add chunk etag when downloading from remote storage
fix https://github.com/seaweedfs/seaweedfs/issues/3987
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_remote.go6
-rw-r--r--weed/server/volume_grpc_remote.go2
2 files changed, 7 insertions, 1 deletions
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 740aad497..27dbb1c80 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -123,8 +123,9 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
// tell filer to tell volume server to download into needles
assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
+ var etag string
err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
+ resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
@@ -141,6 +142,8 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
})
if fetchAndWriteErr != nil {
return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
+ } else {
+ etag = resp.ETag
}
return nil
})
@@ -155,6 +158,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
Offset: localOffset,
Size: uint64(size),
ModifiedTsNs: time.Now().Unix(),
+ ETag: etag,
Fid: &filer_pb.FileId{
VolumeId: uint32(fileId.VolumeId),
FileKey: uint64(fileId.Key),
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 634b27c84..20410007a 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -51,6 +51,8 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
if err == nil {
err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err)
}
+ } else {
+ resp.ETag = n.Etag()
}
}()
if len(req.Replicas) > 0 {