aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/sync_volume.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-22 11:10:08 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-22 11:10:08 -0800
commit5333f2984a47e8712e8fc616a1a22ab297e1926f (patch)
tree5b0c2c83f1206021142bb92d1656fe0835d11aaf /weed/operation/sync_volume.go
parent36d13355bbfcc13233b081e9199a892c67a32865 (diff)
downloadseaweedfs-5333f2984a47e8712e8fc616a1a22ab297e1926f.tar.xz
seaweedfs-5333f2984a47e8712e8fc616a1a22ab297e1926f.zip
streaming updates for large entries or large index file
fix https://github.com/chrislusf/seaweedfs/issues/801
Diffstat (limited to 'weed/operation/sync_volume.go')
-rw-r--r--weed/operation/sync_volume.go21
1 files changed, 18 insertions, 3 deletions
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index 662184656..ac2e2bf79 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -2,6 +2,8 @@ package operation
import (
"context"
+ "fmt"
+ "io"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -23,17 +25,30 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu
func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
- resp, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
+ stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
})
if err != nil {
return err
}
- dataSize := len(resp.IndexFileContent)
+ var indexFileContent []byte
+
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("read index entries: %v", err)
+ }
+ indexFileContent = append(indexFileContent, resp.IndexFileContent...)
+ }
+
+ dataSize := len(indexFileContent)
for idx := 0; idx+NeedleEntrySize <= dataSize; idx += NeedleEntrySize {
- line := resp.IndexFileContent[idx : idx+NeedleEntrySize]
+ line := indexFileContent[idx : idx+NeedleEntrySize]
key := BytesToNeedleId(line[:NeedleIdSize])
offset := BytesToOffset(line[NeedleIdSize : NeedleIdSize+OffsetSize])
size := util.BytesToUint32(line[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize])