diff options
Diffstat (limited to 'weed/storage/volume_sync.go')
| -rw-r--r-- | weed/storage/volume_sync.go | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go index 23d8db510..d7cae8803 100644 --- a/weed/storage/volume_sync.go +++ b/weed/storage/volume_sync.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -48,7 +49,7 @@ optimized more later). func (v *Volume) Synchronize(volumeServer string) (err error) { var lastCompactRevision uint16 = 0 var compactRevision uint16 = 0 - var masterMap CompactMap + var masterMap *needle.CompactMap for i := 0; i < 3; i++ { if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) @@ -69,7 +70,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) { return } -type ByOffset []NeedleValue +type ByOffset []needle.NeedleValue func (a ByOffset) Len() int { return len(a) } func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -77,18 +78,18 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } // trySynchronizing sync with remote volume server incrementally by // make up the local and remote delta. -func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error { +func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error { slaveIdxFile, err := os.Open(v.nm.IndexFileName()) if err != nil { return fmt.Errorf("Open volume %d index file: %v", v.Id, err) } defer slaveIdxFile.Close() - slaveMap, err := LoadNeedleMap(slaveIdxFile) + slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile) if err != nil { return fmt.Errorf("Load volume %d index file: %v", v.Id, err) } - var delta []NeedleValue - if err := masterMap.Visit(func(needleValue NeedleValue) error { + var delta []needle.NeedleValue + if err := masterMap.Visit(func(needleValue needle.NeedleValue) error { if needleValue.Key == 0 { return nil } @@ -100,7 +101,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com }); err != nil { return fmt.Errorf("Add master entry: %v", err) } - if err := slaveMap.m.Visit(func(needleValue NeedleValue) error { + if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error { if needleValue.Key == 0 { return nil } @@ -137,8 +138,8 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com return nil } -func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) { - m = NewCompactMap() +func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) { + m = needle.NewCompactMap() syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) if err != nil { @@ -149,9 +150,9 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, la err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) if offset > 0 && size != TombstoneFileSize { - m.Set(Key(key), offset, size) + m.Set(needle.Key(key), offset, size) } else { - m.Delete(Key(key)) + m.Delete(needle.Key(key)) } total++ }) @@ -178,7 +179,7 @@ func (v *Volume) IndexFileContent() ([]byte, error) { } // removeNeedle removes one needle by needle key -func (v *Volume) removeNeedle(key Key) { +func (v *Volume) removeNeedle(key needle.Key) { n := new(Needle) n.Id = uint64(key) v.deleteNeedle(n) @@ -188,7 +189,7 @@ func (v *Volume) removeNeedle(key Key) { // The compact revision is checked first in case the remote volume // is compacted and the offset is invalid any more. func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, - needleValue NeedleValue, compactRevision uint16) error { + needleValue needle.NeedleValue, compactRevision uint16) error { // add master file entry to local data file values := make(url.Values) values.Add("revision", strconv.Itoa(int(compactRevision))) |
