aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_sync.go')
-rw-r--r--weed/storage/volume_sync.go21
1 files changed, 11 insertions, 10 deletions
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go
index 137a9b4ca..8d90a729d 100644
--- a/weed/storage/volume_sync.go
+++ b/weed/storage/volume_sync.go
@@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"io"
"os"
"sort"
@@ -45,12 +46,12 @@ optimized more later).
*/
-func (v *Volume) Synchronize(volumeServer string) (err error) {
+func (v *Volume) Synchronize(volumeServer string, grpcDialOption grpc.DialOption) (err error) {
var lastCompactRevision uint16 = 0
var compactRevision uint16 = 0
var masterMap *needle.CompactMap
for i := 0; i < 3; i++ {
- if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
+ if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, grpcDialOption, v.Id); err != nil {
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
}
if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
@@ -62,7 +63,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) {
}
}
lastCompactRevision = compactRevision
- if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
+ if err = v.trySynchronizing(volumeServer, grpcDialOption, masterMap, compactRevision); err == nil {
return
}
}
@@ -77,7 +78,7 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
// trySynchronizing sync with remote volume server incrementally by
// make up the local and remote delta.
-func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error {
+func (v *Volume) trySynchronizing(volumeServer string, grpcDialOption grpc.DialOption, masterMap *needle.CompactMap, compactRevision uint16) error {
slaveIdxFile, err := os.Open(v.nm.IndexFileName())
if err != nil {
return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
@@ -126,7 +127,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact
continue
}
// add master file entry to local data file
- if err := v.fetchNeedle(volumeServer, needleValue, compactRevision); err != nil {
+ if err := v.fetchNeedle(volumeServer, grpcDialOption, needleValue, compactRevision); err != nil {
glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
return err
}
@@ -136,16 +137,16 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact
return nil
}
-func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
+func fetchVolumeFileEntries(volumeServer string, grpcDialOption grpc.DialOption, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
m = needle.NewCompactMap()
- syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid))
+ syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
if err != nil {
return m, 0, 0, err
}
total := 0
- err = operation.GetVolumeIdxEntries(volumeServer, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
+ err = operation.GetVolumeIdxEntries(volumeServer, grpcDialOption, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset > 0 && size != TombstoneFileSize {
m.Set(NeedleId(key), offset, size)
@@ -187,9 +188,9 @@ func (v *Volume) removeNeedle(key NeedleId) {
// fetchNeedle fetches a remote volume needle by vid, id, offset
// The compact revision is checked first in case the remote volume
// is compacted and the offset is invalid any more.
-func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue, compactRevision uint16) error {
+func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption, needleValue needle.NeedleValue, compactRevision uint16) error {
- return operation.WithVolumeServerClient(volumeServer, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
VolumdId: uint32(v.Id),
Revision: uint32(compactRevision),