aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_sync.go')
-rw-r--r--weed/storage/volume_sync.go17
1 files changed, 15 insertions, 2 deletions
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go
index 6eb7a61a5..137a9b4ca 100644
--- a/weed/storage/volume_sync.go
+++ b/weed/storage/volume_sync.go
@@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
+ "io"
"os"
"sort"
@@ -164,6 +165,7 @@ func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusRespons
if stat, err := v.dataFile.Stat(); err == nil {
syncStatus.TailOffset = uint64(stat.Size())
}
+ syncStatus.Collection = v.Collection
syncStatus.IdxFileSize = v.nm.IndexFileSize()
syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
syncStatus.Ttl = v.SuperBlock.Ttl.String()
@@ -188,7 +190,7 @@ func (v *Volume) removeNeedle(key NeedleId) {
func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue, compactRevision uint16) error {
return operation.WithVolumeServerClient(volumeServer, func(client volume_server_pb.VolumeServerClient) error {
- resp, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
+ stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
VolumdId: uint32(v.Id),
Revision: uint32(compactRevision),
Offset: uint32(needleValue.Offset),
@@ -198,8 +200,19 @@ func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue
if err != nil {
return err
}
+ var fileContent []byte
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("read needle %v: %v", needleValue.Key.String(), err)
+ }
+ fileContent = append(fileContent, resp.FileContent...)
+ }
- offset, err := v.AppendBlob(resp.FileContent)
+ offset, err := v.AppendBlob(fileContent)
if err != nil {
return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
}