diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-10-29 00:35:16 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-10-29 00:35:16 -0700 |
| commit | 19b6a16003325ec93bb0e261d5a9c08cd3e03cad (patch) | |
| tree | bd0c38a7dec60904418173a8f0825e63f965d255 /weed/storage/volume_backup.go | |
| parent | eb2172f63fcdf7f5455c142daaceb6b1a489f7f4 (diff) | |
| download | seaweedfs-19b6a16003325ec93bb0e261d5a9c08cd3e03cad.tar.xz seaweedfs-19b6a16003325ec93bb0e261d5a9c08cd3e03cad.zip | |
changed from os.file to backend.DataStorageBackend
Diffstat (limited to 'weed/storage/volume_backup.go')
| -rw-r--r-- | weed/storage/volume_backup.go | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index f48ccbb68..fe0506917 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -19,8 +19,8 @@ func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusRespons defer v.dataFileAccessLock.Unlock() var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{} - if stat, err := v.dataFile.Stat(); err == nil { - syncStatus.TailOffset = uint64(stat.Size()) + if datSize, _, err := v.DataBackend.GetStat(); err == nil { + syncStatus.TailOffset = uint64(datSize) } syncStatus.Collection = v.Collection syncStatus.IdxFileSize = v.nm.IndexFileSize() @@ -70,6 +70,8 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial return err } + writeOffset := int64(startFromOffset) + err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{ @@ -80,8 +82,6 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial return err } - v.dataFile.Seek(int64(startFromOffset), io.SeekStart) - for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -92,10 +92,11 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial } } - _, writeErr := v.dataFile.Write(resp.FileContent) + n, writeErr := v.DataBackend.WriteAt(resp.FileContent, writeOffset) if writeErr != nil { return writeErr } + writeOffset += int64(n) } return nil @@ -107,7 +108,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial } // add to needle map - return ScanVolumeFileFrom(v.version, v.dataFile, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v}) + return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v}) } @@ -153,11 +154,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, _, bodyLength, err := needle.ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()) + n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()) if err != nil { return 0, fmt.Errorf("ReadNeedleHeader: %v", err) } - _, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) + _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) if err != nil { return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) } |
