aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_sync.go')
-rw-r--r--weed/storage/volume_sync.go27
1 files changed, 14 insertions, 13 deletions
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go
index 23d8db510..d7cae8803 100644
--- a/weed/storage/volume_sync.go
+++ b/weed/storage/volume_sync.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -48,7 +49,7 @@ optimized more later).
func (v *Volume) Synchronize(volumeServer string) (err error) {
var lastCompactRevision uint16 = 0
var compactRevision uint16 = 0
- var masterMap CompactMap
+ var masterMap *needle.CompactMap
for i := 0; i < 3; i++ {
if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
@@ -69,7 +70,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) {
return
}
-type ByOffset []NeedleValue
+type ByOffset []needle.NeedleValue
func (a ByOffset) Len() int { return len(a) }
func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@@ -77,18 +78,18 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
// trySynchronizing sync with remote volume server incrementally by
// make up the local and remote delta.
-func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error {
+func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error {
slaveIdxFile, err := os.Open(v.nm.IndexFileName())
if err != nil {
return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
}
defer slaveIdxFile.Close()
- slaveMap, err := LoadNeedleMap(slaveIdxFile)
+ slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile)
if err != nil {
return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
}
- var delta []NeedleValue
- if err := masterMap.Visit(func(needleValue NeedleValue) error {
+ var delta []needle.NeedleValue
+ if err := masterMap.Visit(func(needleValue needle.NeedleValue) error {
if needleValue.Key == 0 {
return nil
}
@@ -100,7 +101,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com
}); err != nil {
return fmt.Errorf("Add master entry: %v", err)
}
- if err := slaveMap.m.Visit(func(needleValue NeedleValue) error {
+ if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error {
if needleValue.Key == 0 {
return nil
}
@@ -137,8 +138,8 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com
return nil
}
-func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) {
- m = NewCompactMap()
+func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
+ m = needle.NewCompactMap()
syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
if err != nil {
@@ -149,9 +150,9 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, la
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset > 0 && size != TombstoneFileSize {
- m.Set(Key(key), offset, size)
+ m.Set(needle.Key(key), offset, size)
} else {
- m.Delete(Key(key))
+ m.Delete(needle.Key(key))
}
total++
})
@@ -178,7 +179,7 @@ func (v *Volume) IndexFileContent() ([]byte, error) {
}
// removeNeedle removes one needle by needle key
-func (v *Volume) removeNeedle(key Key) {
+func (v *Volume) removeNeedle(key needle.Key) {
n := new(Needle)
n.Id = uint64(key)
v.deleteNeedle(n)
@@ -188,7 +189,7 @@ func (v *Volume) removeNeedle(key Key) {
// The compact revision is checked first in case the remote volume
// is compacted and the offset is invalid any more.
func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
- needleValue NeedleValue, compactRevision uint16) error {
+ needleValue needle.NeedleValue, compactRevision uint16) error {
// add master file entry to local data file
values := make(url.Values)
values.Add("revision", strconv.Itoa(int(compactRevision)))