aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_backup.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-10-29 00:35:16 -0700
committerChris Lu <chris.lu@gmail.com>2019-10-29 00:35:16 -0700
commit19b6a16003325ec93bb0e261d5a9c08cd3e03cad (patch)
treebd0c38a7dec60904418173a8f0825e63f965d255 /weed/storage/volume_backup.go
parenteb2172f63fcdf7f5455c142daaceb6b1a489f7f4 (diff)
downloadseaweedfs-19b6a16003325ec93bb0e261d5a9c08cd3e03cad.tar.xz
seaweedfs-19b6a16003325ec93bb0e261d5a9c08cd3e03cad.zip
changed from os.file to backend.DataStorageBackend
Diffstat (limited to 'weed/storage/volume_backup.go')
-rw-r--r--weed/storage/volume_backup.go17
1 files changed, 9 insertions, 8 deletions
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index f48ccbb68..fe0506917 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -19,8 +19,8 @@ func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusRespons
defer v.dataFileAccessLock.Unlock()
var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
- if stat, err := v.dataFile.Stat(); err == nil {
- syncStatus.TailOffset = uint64(stat.Size())
+ if datSize, _, err := v.DataBackend.GetStat(); err == nil {
+ syncStatus.TailOffset = uint64(datSize)
}
syncStatus.Collection = v.Collection
syncStatus.IdxFileSize = v.nm.IndexFileSize()
@@ -70,6 +70,8 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
return err
}
+ writeOffset := int64(startFromOffset)
+
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
@@ -80,8 +82,6 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
return err
}
- v.dataFile.Seek(int64(startFromOffset), io.SeekStart)
-
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
@@ -92,10 +92,11 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
}
- _, writeErr := v.dataFile.Write(resp.FileContent)
+ n, writeErr := v.DataBackend.WriteAt(resp.FileContent, writeOffset)
if writeErr != nil {
return writeErr
}
+ writeOffset += int64(n)
}
return nil
@@ -107,7 +108,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
// add to needle map
- return ScanVolumeFileFrom(v.version, v.dataFile, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
+ return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
}
@@ -153,11 +154,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
- n, _, bodyLength, err := needle.ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset())
+ n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset())
if err != nil {
return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
}
- _, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
+ _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
}