aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2017-05-26 22:51:25 -0700
committerChris Lu <chris.lu@gmail.com>2017-05-26 22:51:25 -0700
commit82c3ccc8ddb1fea27a15f0610cf7730deb4529bc (patch)
treeb1deba002de8d716ca395c3b29c43448a4522f7d
parent80cefade65ecf965cde769240594f17b0a562304 (diff)
downloadseaweedfs-82c3ccc8ddb1fea27a15f0610cf7730deb4529bc.tar.xz
seaweedfs-82c3ccc8ddb1fea27a15f0610cf7730deb4529bc.zip
add btree for volume index
-rw-r--r--weed/command/export.go2
-rw-r--r--weed/command/fix.go2
-rw-r--r--weed/command/server.go4
-rw-r--r--weed/command/volume.go4
-rw-r--r--weed/storage/needle/btree_map.go52
-rw-r--r--weed/storage/needle/compact_map.go194
-rw-r--r--weed/storage/needle/compact_map_perf_test.go (renamed from weed/storage/compact_map_perf_test.go)9
-rw-r--r--weed/storage/needle/compact_map_test.go (renamed from weed/storage/compact_map_test.go)2
-rw-r--r--weed/storage/needle/needle_value.go28
-rw-r--r--weed/storage/needle/needle_value_map.go8
-rw-r--r--weed/storage/needle_map.go4
-rw-r--r--weed/storage/needle_map_boltdb.go7
-rw-r--r--weed/storage/needle_map_leveldb.go7
-rw-r--r--weed/storage/needle_map_memory.go40
-rw-r--r--weed/storage/volume_loading.go15
-rw-r--r--weed/storage/volume_sync.go27
-rw-r--r--weed/storage/volume_vacuum.go4
17 files changed, 361 insertions, 48 deletions
diff --git a/weed/command/export.go b/weed/command/export.go
index 5a7dc71d9..0f7496472 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -118,7 +118,7 @@ func runExport(cmd *Command, args []string) bool {
}
defer indexFile.Close()
- needleMap, err := storage.LoadNeedleMap(indexFile)
+ needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
if err != nil {
glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
}
diff --git a/weed/command/fix.go b/weed/command/fix.go
index 22480dcd0..f3103c6c2 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -43,7 +43,7 @@ func runFix(cmd *Command, args []string) bool {
}
defer indexFile.Close()
- nm := storage.NewNeedleMap(indexFile)
+ nm := storage.NewBtreeNeedleMap(indexFile)
defer nm.Close()
vid := storage.VolumeId(*fixVolumeId)
diff --git a/weed/command/server.go b/weed/command/server.go
index e1152f23f..ad6916b8f 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -72,7 +72,7 @@ var (
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
+ volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
@@ -276,6 +276,8 @@ func runServer(cmd *Command, args []string) bool {
volumeNeedleMapKind = storage.NeedleMapLevelDb
case "boltdb":
volumeNeedleMapKind = storage.NeedleMapBoltDb
+ case "btree":
+ volumeNeedleMapKind = storage.NeedleMapBtree
}
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*serverIp, *volumePort, *volumeServerPublicUrl,
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 0e69325b6..a4e316ecb 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -52,7 +52,7 @@ func init() {
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
- v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
+ v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.enableBytesCache = cmdVolume.Flag.Bool("cache.enable", false, "direct cache instead of OS cache, cost more memory.")
@@ -126,6 +126,8 @@ func runVolume(cmd *Command, args []string) bool {
volumeNeedleMapKind = storage.NeedleMapLevelDb
case "boltdb":
volumeNeedleMapKind = storage.NeedleMapBoltDb
+ case "btree":
+ volumeNeedleMapKind = storage.NeedleMapBtree
}
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
diff --git a/weed/storage/needle/btree_map.go b/weed/storage/needle/btree_map.go
new file mode 100644
index 000000000..64c0bacc1
--- /dev/null
+++ b/weed/storage/needle/btree_map.go
@@ -0,0 +1,52 @@
+package needle
+
+import (
+ "github.com/google/btree"
+)
+
+//This map assumes mostly inserting increasing keys
+type BtreeMap struct {
+ tree *btree.BTree
+}
+
+func NewBtreeMap() *BtreeMap {
+ return &BtreeMap{
+ tree: btree.New(32),
+ }
+}
+
+func (cm *BtreeMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
+ found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size})
+ if found != nil {
+ old := found.(NeedleValue)
+ return old.Offset, old.Size
+ }
+ return
+}
+
+func (cm *BtreeMap) Delete(key Key) (oldSize uint32) {
+ found := cm.tree.Delete(NeedleValue{key, 0, 0})
+ if found != nil {
+ old := found.(NeedleValue)
+ return old.Size
+ }
+ return
+}
+func (cm *BtreeMap) Get(key Key) (*NeedleValue, bool) {
+ found := cm.tree.Get(NeedleValue{key, 0, 0})
+ if found != nil {
+ old := found.(NeedleValue)
+ return &old, true
+ }
+ return nil, false
+}
+
+// Visit visits all entries or stop if any error when visiting
+func (cm *BtreeMap) Visit(visit func(NeedleValue) error) (ret error) {
+ cm.tree.Ascend(func(item btree.Item) bool {
+ needle := item.(NeedleValue)
+ ret = visit(needle)
+ return ret == nil
+ })
+ return ret
+}
diff --git a/weed/storage/needle/compact_map.go b/weed/storage/needle/compact_map.go
new file mode 100644
index 000000000..ea2360fa7
--- /dev/null
+++ b/weed/storage/needle/compact_map.go
@@ -0,0 +1,194 @@
+package needle
+
+import (
+ "sync"
+)
+
+type CompactSection struct {
+ sync.RWMutex
+ values []NeedleValue
+ overflow map[Key]NeedleValue
+ start Key
+ end Key
+ counter int
+}
+
+func NewCompactSection(start Key) *CompactSection {
+ return &CompactSection{
+ values: make([]NeedleValue, batch),
+ overflow: make(map[Key]NeedleValue),
+ start: start,
+ }
+}
+
+//return old entry size
+func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
+ cs.Lock()
+ if key > cs.end {
+ cs.end = key
+ }
+ if i := cs.binarySearchValues(key); i >= 0 {
+ oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
+ //println("key", key, "old size", ret)
+ cs.values[i].Offset, cs.values[i].Size = offset, size
+ } else {
+ needOverflow := cs.counter >= batch
+ needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
+ if needOverflow {
+ //println("start", cs.start, "counter", cs.counter, "key", key)
+ if oldValue, found := cs.overflow[key]; found {
+ oldOffset, oldSize = oldValue.Offset, oldValue.Size
+ }
+ cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size}
+ } else {
+ p := &cs.values[cs.counter]
+ p.Key, p.Offset, p.Size = key, offset, size
+ //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
+ cs.counter++
+ }
+ }
+ cs.Unlock()
+ return
+}
+
+//return old entry size
+func (cs *CompactSection) Delete(key Key) uint32 {
+ cs.Lock()
+ ret := uint32(0)
+ if i := cs.binarySearchValues(key); i >= 0 {
+ if cs.values[i].Size > 0 {
+ ret = cs.values[i].Size
+ cs.values[i].Size = 0
+ }
+ }
+ if v, found := cs.overflow[key]; found {
+ delete(cs.overflow, key)
+ ret = v.Size
+ }
+ cs.Unlock()
+ return ret
+}
+func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
+ cs.RLock()
+ if v, ok := cs.overflow[key]; ok {
+ cs.RUnlock()
+ return &v, true
+ }
+ if i := cs.binarySearchValues(key); i >= 0 {
+ cs.RUnlock()
+ return &cs.values[i], true
+ }
+ cs.RUnlock()
+ return nil, false
+}
+func (cs *CompactSection) binarySearchValues(key Key) int {
+ l, h := 0, cs.counter-1
+ if h >= 0 && cs.values[h].Key < key {
+ return -2
+ }
+ //println("looking for key", key)
+ for l <= h {
+ m := (l + h) / 2
+ //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
+ if cs.values[m].Key < key {
+ l = m + 1
+ } else if key < cs.values[m].Key {
+ h = m - 1
+ } else {
+ //println("found", m)
+ return m
+ }
+ }
+ return -1
+}
+
+//This map assumes mostly inserting increasing keys
+//This map assumes mostly inserting increasing keys
+type CompactMap struct {
+ list []*CompactSection
+}
+
+func NewCompactMap() *CompactMap {
+ return &CompactMap{}
+}
+
+func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ //println(x, "creating", len(cm.list), "section, starting", key)
+ cm.list = append(cm.list, NewCompactSection(key))
+ x = len(cm.list) - 1
+ //keep compact section sorted by start
+ for x > 0 {
+ if cm.list[x-1].start > cm.list[x].start {
+ cm.list[x-1], cm.list[x] = cm.list[x], cm.list[x-1]
+ x = x - 1
+ } else {
+ break
+ }
+ }
+ }
+ return cm.list[x].Set(key, offset, size)
+}
+func (cm *CompactMap) Delete(key Key) uint32 {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ return uint32(0)
+ }
+ return cm.list[x].Delete(key)
+}
+func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ return nil, false
+ }
+ return cm.list[x].Get(key)
+}
+func (cm *CompactMap) binarySearchCompactSection(key Key) int {
+ l, h := 0, len(cm.list)-1
+ if h < 0 {
+ return -5
+ }
+ if cm.list[h].start <= key {
+ if cm.list[h].counter < batch || key <= cm.list[h].end {
+ return h
+ }
+ return -4
+ }
+ for l <= h {
+ m := (l + h) / 2
+ if key < cm.list[m].start {
+ h = m - 1
+ } else { // cm.list[m].start <= key
+ if cm.list[m+1].start <= key {
+ l = m + 1
+ } else {
+ return m
+ }
+ }
+ }
+ return -3
+}
+
+// Visit visits all entries or stop if any error when visiting
+func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
+ for _, cs := range cm.list {
+ cs.RLock()
+ for _, v := range cs.overflow {
+ if err := visit(v); err != nil {
+ cs.RUnlock()
+ return err
+ }
+ }
+ for _, v := range cs.values {
+ if _, found := cs.overflow[v.Key]; !found {
+ if err := visit(v); err != nil {
+ cs.RUnlock()
+ return err
+ }
+ }
+ }
+ cs.RUnlock()
+ }
+ return nil
+}
diff --git a/weed/storage/compact_map_perf_test.go b/weed/storage/needle/compact_map_perf_test.go
index cc7669139..8a26e7ed3 100644
--- a/weed/storage/compact_map_perf_test.go
+++ b/weed/storage/needle/compact_map_perf_test.go
@@ -1,4 +1,4 @@
-package storage
+package needle
import (
"log"
@@ -11,15 +11,15 @@ import (
func TestMemoryUsage(t *testing.T) {
- indexFile, ie := os.OpenFile("../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
+ indexFile, ie := os.OpenFile("../../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
if ie != nil {
log.Fatalln(ie)
}
- LoadNewNeedleMap(indexFile)
+ loadNewNeedleMap(indexFile)
}
-func LoadNewNeedleMap(file *os.File) CompactMap {
+func loadNewNeedleMap(file *os.File) {
m := NewCompactMap()
bytes := make([]byte, 16*1024)
count, e := file.Read(bytes)
@@ -41,5 +41,4 @@ func LoadNewNeedleMap(file *os.File) CompactMap {
count, e = file.Read(bytes)
}
- return m
}
diff --git a/weed/storage/compact_map_test.go b/weed/storage/needle/compact_map_test.go
index 1ccb48edb..4d574bafe 100644
--- a/weed/storage/compact_map_test.go
+++ b/weed/storage/needle/compact_map_test.go
@@ -1,4 +1,4 @@
-package storage
+package needle
import (
"testing"
diff --git a/weed/storage/needle/needle_value.go b/weed/storage/needle/needle_value.go
new file mode 100644
index 000000000..137ab0814
--- /dev/null
+++ b/weed/storage/needle/needle_value.go
@@ -0,0 +1,28 @@
+package needle
+
+import (
+ "strconv"
+
+ "github.com/google/btree"
+)
+
+const (
+ batch = 100000
+)
+
+type NeedleValue struct {
+ Key Key
+ Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
+ Size uint32 `comment:"Size of the data portion"`
+}
+
+func (this NeedleValue) Less(than btree.Item) bool {
+ that := than.(NeedleValue)
+ return this.Key < that.Key
+}
+
+type Key uint64
+
+func (k Key) String() string {
+ return strconv.FormatUint(uint64(k), 10)
+}
diff --git a/weed/storage/needle/needle_value_map.go b/weed/storage/needle/needle_value_map.go
new file mode 100644
index 000000000..81f41b235
--- /dev/null
+++ b/weed/storage/needle/needle_value_map.go
@@ -0,0 +1,8 @@
+package needle
+
+type NeedleValueMap interface {
+ Set(key Key, offset, size uint32) (oldOffset, oldSize uint32)
+ Delete(key Key) uint32
+ Get(key Key) (*NeedleValue, bool)
+ Visit(visit func(NeedleValue) error) error
+}
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
index 15a0387c5..14e4ccf3a 100644
--- a/weed/storage/needle_map.go
+++ b/weed/storage/needle_map.go
@@ -6,6 +6,7 @@ import (
"os"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -15,6 +16,7 @@ const (
NeedleMapInMemory NeedleMapType = iota
NeedleMapLevelDb
NeedleMapBoltDb
+ NeedleMapBtree
)
const (
@@ -23,7 +25,7 @@ const (
type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) error
- Get(key uint64) (element *NeedleValue, ok bool)
+ Get(key uint64) (element *needle.NeedleValue, ok bool)
Delete(key uint64, offset uint32) error
Close()
Destroy() error
diff --git a/weed/storage/needle_map_boltdb.go b/weed/storage/needle_map_boltdb.go
index e131ea822..cbcc786af 100644
--- a/weed/storage/needle_map_boltdb.go
+++ b/weed/storage/needle_map_boltdb.go
@@ -7,6 +7,7 @@ import (
"github.com/boltdb/bolt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -31,7 +32,7 @@ func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleM
return
}
glog.V(1).Infof("Loading %s...", indexFile.Name())
- nm, indexLoadError := LoadNeedleMap(indexFile)
+ nm, indexLoadError := LoadBtreeNeedleMap(indexFile)
if indexLoadError != nil {
return nil, indexLoadError
}
@@ -72,7 +73,7 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
})
}
-func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
bytes := make([]byte, 8)
var data []byte
util.Uint64toBytes(bytes, key)
@@ -91,7 +92,7 @@ func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
}
offset := util.BytesToUint32(data[0:4])
size := util.BytesToUint32(data[4:8])
- return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+ return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true
}
func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index f025ea360..bd2c7c886 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -6,6 +6,7 @@ import (
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -29,7 +30,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl
return
}
glog.V(1).Infof("Loading %s...", indexFile.Name())
- nm, indexLoadError := LoadNeedleMap(indexFile)
+ nm, indexLoadError := LoadBtreeNeedleMap(indexFile)
if indexLoadError != nil {
return nil, indexLoadError
}
@@ -70,7 +71,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
})
}
-func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+func (m *LevelDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
bytes := make([]byte, 8)
util.Uint64toBytes(bytes, key)
data, err := m.db.Get(bytes, nil)
@@ -79,7 +80,7 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
}
offset := util.BytesToUint32(data[0:4])
size := util.BytesToUint32(data[4:8])
- return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+ return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true
}
func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index ccbb21317..f34a57849 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -5,17 +5,26 @@ import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type NeedleMap struct {
- m CompactMap
+ m needle.NeedleValueMap
baseNeedleMapper
}
-func NewNeedleMap(file *os.File) *NeedleMap {
+func NewCompactNeedleMap(file *os.File) *NeedleMap {
nm := &NeedleMap{
- m: NewCompactMap(),
+ m: needle.NewCompactMap(),
+ }
+ nm.indexFile = file
+ return nm
+}
+
+func NewBtreeNeedleMap(file *os.File) *NeedleMap {
+ nm := &NeedleMap{
+ m: needle.NewBtreeMap(),
}
nm.indexFile = file
return nm
@@ -25,8 +34,17 @@ const (
RowsToRead = 1024
)
-func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
- nm := NewNeedleMap(file)
+func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
+ nm := NewCompactNeedleMap(file)
+ return doLoading(file, nm)
+}
+
+func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
+ nm := NewBtreeNeedleMap(file)
+ return doLoading(file, nm)
+}
+
+func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
if key > nm.MaximumFileKey {
nm.MaximumFileKey = key
@@ -34,14 +52,14 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
if offset > 0 && size != TombstoneFileSize {
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- oldOffset, oldSize := nm.m.Set(Key(key), offset, size)
+ oldOffset, oldSize := nm.m.Set(needle.Key(key), offset, size)
glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
if oldOffset > 0 && oldSize != TombstoneFileSize {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
} else {
- oldSize := nm.m.Delete(Key(key))
+ oldSize := nm.m.Delete(needle.Key(key))
glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
@@ -84,16 +102,16 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
- _, oldSize := nm.m.Set(Key(key), offset, size)
+ _, oldSize := nm.m.Set(needle.Key(key), offset, size)
nm.logPut(key, oldSize, size)
return nm.appendToIndexFile(key, offset, size)
}
-func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- element, ok = nm.m.Get(Key(key))
+func (nm *NeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
+ element, ok = nm.m.Get(needle.Key(key))
return
}
func (nm *NeedleMap) Delete(key uint64, offset uint32) error {
- deletedBytes := nm.m.Delete(Key(key))
+ deletedBytes := nm.m.Delete(needle.Key(key))
nm.logDelete(deletedBytes)
return nm.appendToIndexFile(key, offset, TombstoneFileSize)
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 4be860987..457d50410 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -70,20 +70,25 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
switch needleMapKind {
case NeedleMapInMemory:
- glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
- if v.nm, e = LoadNeedleMap(indexFile); e != nil {
- glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
+ glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly)
+ if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
+ glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
}
case NeedleMapLevelDb:
- glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
+ glog.V(0).Infoln("loading leveldb", fileName+".ldb")
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
}
case NeedleMapBoltDb:
- glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
+ glog.V(0).Infoln("loading boltdb", fileName+".bdb")
if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
}
+ case NeedleMapBtree:
+ glog.V(0).Infoln("loading index", fileName+".idx", "to btree readonly", v.readOnly)
+ if v.nm, e = LoadBtreeNeedleMap(indexFile); e != nil {
+ glog.V(0).Infof("loading index %s to btree error: %v", fileName+".idx", e)
+ }
}
}
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go
index 23d8db510..d7cae8803 100644
--- a/weed/storage/volume_sync.go
+++ b/weed/storage/volume_sync.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -48,7 +49,7 @@ optimized more later).
func (v *Volume) Synchronize(volumeServer string) (err error) {
var lastCompactRevision uint16 = 0
var compactRevision uint16 = 0
- var masterMap CompactMap
+ var masterMap *needle.CompactMap
for i := 0; i < 3; i++ {
if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
@@ -69,7 +70,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) {
return
}
-type ByOffset []NeedleValue
+type ByOffset []needle.NeedleValue
func (a ByOffset) Len() int { return len(a) }
func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@@ -77,18 +78,18 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
// trySynchronizing sync with remote volume server incrementally by
// make up the local and remote delta.
-func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error {
+func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error {
slaveIdxFile, err := os.Open(v.nm.IndexFileName())
if err != nil {
return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
}
defer slaveIdxFile.Close()
- slaveMap, err := LoadNeedleMap(slaveIdxFile)
+ slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile)
if err != nil {
return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
}
- var delta []NeedleValue
- if err := masterMap.Visit(func(needleValue NeedleValue) error {
+ var delta []needle.NeedleValue
+ if err := masterMap.Visit(func(needleValue needle.NeedleValue) error {
if needleValue.Key == 0 {
return nil
}
@@ -100,7 +101,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com
}); err != nil {
return fmt.Errorf("Add master entry: %v", err)
}
- if err := slaveMap.m.Visit(func(needleValue NeedleValue) error {
+ if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error {
if needleValue.Key == 0 {
return nil
}
@@ -137,8 +138,8 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com
return nil
}
-func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) {
- m = NewCompactMap()
+func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
+ m = needle.NewCompactMap()
syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
if err != nil {
@@ -149,9 +150,9 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, la
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset > 0 && size != TombstoneFileSize {
- m.Set(Key(key), offset, size)
+ m.Set(needle.Key(key), offset, size)
} else {
- m.Delete(Key(key))
+ m.Delete(needle.Key(key))
}
total++
})
@@ -178,7 +179,7 @@ func (v *Volume) IndexFileContent() ([]byte, error) {
}
// removeNeedle removes one needle by needle key
-func (v *Volume) removeNeedle(key Key) {
+func (v *Volume) removeNeedle(key needle.Key) {
n := new(Needle)
n.Id = uint64(key)
v.deleteNeedle(n)
@@ -188,7 +189,7 @@ func (v *Volume) removeNeedle(key Key) {
// The compact revision is checked first in case the remote volume
// is compacted and the offset is invalid any more.
func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
- needleValue NeedleValue, compactRevision uint16) error {
+ needleValue needle.NeedleValue, compactRevision uint16) error {
// add master file entry to local data file
values := make(url.Values)
values.Add("revision", strconv.Itoa(int(compactRevision)))
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 07916fe6b..f6f68d59b 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -221,7 +221,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
}
defer idx.Close()
- nm := NewNeedleMap(idx)
+ nm := NewBtreeNeedleMap(idx)
new_offset := int64(SuperBlockSize)
now := uint64(time.Now().Unix())
@@ -272,7 +272,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
}
defer oldIndexFile.Close()
- nm := NewNeedleMap(idx)
+ nm := NewBtreeNeedleMap(idx)
now := uint64(time.Now().Unix())
v.SuperBlock.CompactRevision++