diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-19 12:29:49 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-19 12:29:49 -0700 |
| commit | 3b3651dea3277a8f816226ba191d106ede70f2dd (patch) | |
| tree | cc42eea0d125f9f899a23bb6a50d2a0c27c89db9 /weed/server | |
| parent | 730a032137df3d13d5dc4d21f25133edd5e5fa23 (diff) | |
| download | seaweedfs-3b3651dea3277a8f816226ba191d106ede70f2dd.tar.xz seaweedfs-3b3651dea3277a8f816226ba191d106ede70f2dd.zip | |
volume: atomic copying file, adds version and stopOffset
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 41 |
1 files changed, 33 insertions, 8 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 0cb5bb455..dbcdb1052 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -52,9 +52,13 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo return fmt.Errorf("read volume file status failed, %v", err) } + // println("source:", volFileInfoResp.String()) + copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ - VolumeId: req.VolumeId, - IsIdxFile: true, + VolumeId: req.VolumeId, + IsIdxFile: true, + CompactionRevision: volFileInfoResp.CompactionRevision, + StopOffset: volFileInfoResp.IdxFileSize, }) if err != nil { return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err) @@ -66,8 +70,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ - VolumeId: req.VolumeId, - IsDatFile: true, + VolumeId: req.VolumeId, + IsDatFile: true, + CompactionRevision: volFileInfoResp.CompactionRevision, + StopOffset: volFileInfoResp.DatFileSize, }) if err != nil { return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err) @@ -97,7 +103,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } return &volume_server_pb.VolumeCopyResponse{ - LastAppendAtNs:volFileInfoResp.DatFileTimestampSeconds*uint64(time.Second), + LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second), }, err } @@ -161,6 +167,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se resp.DatFileTimestampSeconds = uint64(modTime.Unix()) resp.IdxFileTimestampSeconds = uint64(modTime.Unix()) resp.FileCount = v.FileCount() + resp.CompactionRevision = uint32(v.CompactionRevision) return resp, nil } @@ -171,7 +178,13 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v return fmt.Errorf("not found volume id %d", req.VolumeId) } - const BufferSize = 1024 * 16 + if uint32(v.CompactionRevision) != req.CompactionRevision { + return fmt.Errorf("volume %d is compacted", req.VolumeId) + } + + bytesToRead := int64(req.StopOffset) + + const BufferSize = 1024 * 1024 * 2 var fileName = v.FileName() if req.IsDatFile { fileName += ".dat" @@ -186,19 +199,31 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v buffer := make([]byte, BufferSize) - for { + for bytesToRead > 0 { bytesread, err := file.Read(buffer) + // println(fileName, "read", bytesread, "bytes, with target", bytesToRead) + if err != nil { if err != io.EOF { return err } + // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error()) break } - stream.Send(&volume_server_pb.CopyFileResponse{ + if int64(bytesread) > bytesToRead { + bytesread = int(bytesToRead) + } + err = stream.Send(&volume_server_pb.CopyFileResponse{ FileContent: buffer[:bytesread], }) + if err != nil { + // println("sending", bytesread, "bytes err", err.Error()) + return err + } + + bytesToRead -= int64(bytesread) } |
