aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-19 12:29:49 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-19 12:29:49 -0700
commit3b3651dea3277a8f816226ba191d106ede70f2dd (patch)
treecc42eea0d125f9f899a23bb6a50d2a0c27c89db9 /weed/server
parent730a032137df3d13d5dc4d21f25133edd5e5fa23 (diff)
downloadseaweedfs-3b3651dea3277a8f816226ba191d106ede70f2dd.tar.xz
seaweedfs-3b3651dea3277a8f816226ba191d106ede70f2dd.zip
volume: atomic copying file, adds version and stopOffset
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/volume_grpc_copy.go41
1 files changed, 33 insertions, 8 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 0cb5bb455..dbcdb1052 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -52,9 +52,13 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
+ // println("source:", volFileInfoResp.String())
+
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: req.VolumeId,
- IsIdxFile: true,
+ VolumeId: req.VolumeId,
+ IsIdxFile: true,
+ CompactionRevision: volFileInfoResp.CompactionRevision,
+ StopOffset: volFileInfoResp.IdxFileSize,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
@@ -66,8 +70,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: req.VolumeId,
- IsDatFile: true,
+ VolumeId: req.VolumeId,
+ IsDatFile: true,
+ CompactionRevision: volFileInfoResp.CompactionRevision,
+ StopOffset: volFileInfoResp.DatFileSize,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
@@ -97,7 +103,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
return &volume_server_pb.VolumeCopyResponse{
- LastAppendAtNs:volFileInfoResp.DatFileTimestampSeconds*uint64(time.Second),
+ LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
}, err
}
@@ -161,6 +167,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
resp.DatFileTimestampSeconds = uint64(modTime.Unix())
resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
resp.FileCount = v.FileCount()
+ resp.CompactionRevision = uint32(v.CompactionRevision)
return resp, nil
}
@@ -171,7 +178,13 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
- const BufferSize = 1024 * 16
+ if uint32(v.CompactionRevision) != req.CompactionRevision {
+ return fmt.Errorf("volume %d is compacted", req.VolumeId)
+ }
+
+ bytesToRead := int64(req.StopOffset)
+
+ const BufferSize = 1024 * 1024 * 2
var fileName = v.FileName()
if req.IsDatFile {
fileName += ".dat"
@@ -186,19 +199,31 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
buffer := make([]byte, BufferSize)
- for {
+ for bytesToRead > 0 {
bytesread, err := file.Read(buffer)
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+
if err != nil {
if err != io.EOF {
return err
}
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
break
}
- stream.Send(&volume_server_pb.CopyFileResponse{
+ if int64(bytesread) > bytesToRead {
+ bytesread = int(bytesToRead)
+ }
+ err = stream.Send(&volume_server_pb.CopyFileResponse{
FileContent: buffer[:bytesread],
})
+ if err != nil {
+ // println("sending", bytesread, "bytes err", err.Error())
+ return err
+ }
+
+ bytesToRead -= int64(bytesread)
}