diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-02-18 12:11:52 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-02-18 12:11:52 -0800 |
| commit | 77b9af531d18e10b04b49b069b5f26a329ed4902 (patch) | |
| tree | cae2524dfc445b352e5d6bab7a82f7af46b7a4c8 /weed/storage/volume_sync.go | |
| parent | 55761ae806bc7cc8ab34424508aee5481131b941 (diff) | |
| download | seaweedfs-77b9af531d18e10b04b49b069b5f26a329ed4902.tar.xz seaweedfs-77b9af531d18e10b04b49b069b5f26a329ed4902.zip | |
adding grpc mutual tls
Diffstat (limited to 'weed/storage/volume_sync.go')
| -rw-r--r-- | weed/storage/volume_sync.go | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go index 137a9b4ca..8d90a729d 100644 --- a/weed/storage/volume_sync.go +++ b/weed/storage/volume_sync.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "google.golang.org/grpc" "io" "os" "sort" @@ -45,12 +46,12 @@ optimized more later). */ -func (v *Volume) Synchronize(volumeServer string) (err error) { +func (v *Volume) Synchronize(volumeServer string, grpcDialOption grpc.DialOption) (err error) { var lastCompactRevision uint16 = 0 var compactRevision uint16 = 0 var masterMap *needle.CompactMap for i := 0; i < 3; i++ { - if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { + if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, grpcDialOption, v.Id); err != nil { return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) } if lastCompactRevision != compactRevision && lastCompactRevision != 0 { @@ -62,7 +63,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) { } } lastCompactRevision = compactRevision - if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil { + if err = v.trySynchronizing(volumeServer, grpcDialOption, masterMap, compactRevision); err == nil { return } } @@ -77,7 +78,7 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } // trySynchronizing sync with remote volume server incrementally by // make up the local and remote delta. -func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error { +func (v *Volume) trySynchronizing(volumeServer string, grpcDialOption grpc.DialOption, masterMap *needle.CompactMap, compactRevision uint16) error { slaveIdxFile, err := os.Open(v.nm.IndexFileName()) if err != nil { return fmt.Errorf("Open volume %d index file: %v", v.Id, err) @@ -126,7 +127,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact continue } // add master file entry to local data file - if err := v.fetchNeedle(volumeServer, needleValue, compactRevision); err != nil { + if err := v.fetchNeedle(volumeServer, grpcDialOption, needleValue, compactRevision); err != nil { glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err) return err } @@ -136,16 +137,16 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact return nil } -func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) { +func fetchVolumeFileEntries(volumeServer string, grpcDialOption grpc.DialOption, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) { m = needle.NewCompactMap() - syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid)) + syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) if err != nil { return m, 0, 0, err } total := 0 - err = operation.GetVolumeIdxEntries(volumeServer, uint32(vid), func(key NeedleId, offset Offset, size uint32) { + err = operation.GetVolumeIdxEntries(volumeServer, grpcDialOption, uint32(vid), func(key NeedleId, offset Offset, size uint32) { // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) if offset > 0 && size != TombstoneFileSize { m.Set(NeedleId(key), offset, size) @@ -187,9 +188,9 @@ func (v *Volume) removeNeedle(key NeedleId) { // fetchNeedle fetches a remote volume needle by vid, id, offset // The compact revision is checked first in case the remote volume // is compacted and the offset is invalid any more. -func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue, compactRevision uint16) error { +func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption, needleValue needle.NeedleValue, compactRevision uint16) error { - return operation.WithVolumeServerClient(volumeServer, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{ VolumdId: uint32(v.Id), Revision: uint32(compactRevision), |
