aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-12-24 10:18:56 -0800
committerChris Lu <chris.lu@gmail.com>2019-12-24 10:18:56 -0800
commitabffe857a1cf6e6fac5a23ea2cd2a00d5744d9fb (patch)
treea89c3f90c5d5e7de306ce232cc11b64597e0b0ac
parent72a561ab7c021db516de69257f47f3306fd7b7ed (diff)
downloadseaweedfs-abffe857a1cf6e6fac5a23ea2cd2a00d5744d9fb.tar.xz
seaweedfs-abffe857a1cf6e6fac5a23ea2cd2a00d5744d9fb.zip
change btree map to in memory level db
-rw-r--r--weed/command/export.go14
-rw-r--r--weed/command/fix.go10
-rw-r--r--weed/storage/needle_map/btree_map.go53
-rw-r--r--weed/storage/needle_map/memdb.go112
-rw-r--r--weed/storage/needle_map_memory.go13
-rw-r--r--weed/storage/needle_map_metric_test.go30
-rw-r--r--weed/storage/volume_vacuum.go32
7 files changed, 138 insertions, 126 deletions
diff --git a/weed/command/export.go b/weed/command/export.go
index a27b88c64..8d664ad3b 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -16,6 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -89,7 +90,7 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version,
type VolumeFileScanner4Export struct {
version needle.Version
counter int
- needleMap *storage.NeedleMap
+ needleMap *needle_map.MemDb
vid needle.VolumeId
}
@@ -192,15 +193,10 @@ func runExport(cmd *Command, args []string) bool {
fileName = *export.collection + "_" + fileName
}
vid := needle.VolumeId(*export.volumeId)
- indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
- if err != nil {
- glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
+ needleMap := needle_map.NewMemDb()
+ if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
+ glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
volumeFileScanner := &VolumeFileScanner4Export{
diff --git a/weed/command/fix.go b/weed/command/fix.go
index 9acf1801f..76bc19f7e 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -32,7 +33,7 @@ var (
type VolumeFileScanner4Fix struct {
version needle.Version
- nm *storage.NeedleMap
+ nm *needle_map.MemDb
}
func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
@@ -47,11 +48,11 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
if n.Size > 0 && n.Size != types.TombstoneFileSize {
- pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size)
+ pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
glog.V(2).Infof("skipping deleted file ...")
- return scanner.nm.Delete(n.Id, types.ToOffset(offset))
+ return scanner.nm.Delete(n.Id)
}
return nil
}
@@ -73,8 +74,7 @@ func runFix(cmd *Command, args []string) bool {
}
defer indexFile.Close()
- nm := storage.NewBtreeNeedleMap(indexFile)
- defer nm.Close()
+ nm := needle_map.NewMemDb()
vid := needle.VolumeId(*fixVolumeId)
scanner := &VolumeFileScanner4Fix{
diff --git a/weed/storage/needle_map/btree_map.go b/weed/storage/needle_map/btree_map.go
deleted file mode 100644
index a26c5e068..000000000
--- a/weed/storage/needle_map/btree_map.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package needle_map
-
-import (
- . "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/google/btree"
-)
-
-//This map assumes mostly inserting increasing keys
-type BtreeMap struct {
- tree *btree.BTree
-}
-
-func NewBtreeMap() *BtreeMap {
- return &BtreeMap{
- tree: btree.New(32),
- }
-}
-
-func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
- found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size})
- if found != nil {
- old := found.(NeedleValue)
- return old.Offset, old.Size
- }
- return
-}
-
-func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) {
- found := cm.tree.Delete(NeedleValue{key, Offset{}, 0})
- if found != nil {
- old := found.(NeedleValue)
- return old.Size
- }
- return
-}
-func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) {
- found := cm.tree.Get(NeedleValue{key, Offset{}, 0})
- if found != nil {
- old := found.(NeedleValue)
- return &old, true
- }
- return nil, false
-}
-
-// Visit visits all entries or stop if any error when visiting
-func (cm *BtreeMap) AscendingVisit(visit func(NeedleValue) error) (ret error) {
- cm.tree.Ascend(func(item btree.Item) bool {
- needle := item.(NeedleValue)
- ret = visit(needle)
- return ret == nil
- })
- return ret
-}
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
new file mode 100644
index 000000000..6aba6adeb
--- /dev/null
+++ b/weed/storage/needle_map/memdb.go
@@ -0,0 +1,112 @@
+package needle_map
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+//This map uses in memory level db
+type MemDb struct {
+ db *leveldb.DB
+}
+
+func NewMemDb() *MemDb {
+ opts := &opt.Options{}
+
+ var err error
+ t := &MemDb{}
+ if t.db, err = leveldb.Open(storage.NewMemStorage(), opts); err != nil {
+ glog.V(0).Infof("MemDb fails to open: %v", err)
+ return nil
+ }
+
+ return t
+}
+
+func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error {
+
+ bytes := ToBytes(key, offset, size)
+
+ if err := cm.db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
+ return fmt.Errorf("failed to write temp leveldb: %v", err)
+ }
+ return nil
+}
+
+func (cm *MemDb) Delete(key NeedleId) error {
+ bytes := make([]byte, NeedleIdSize)
+ NeedleIdToBytes(bytes, key)
+ return cm.db.Delete(bytes, nil)
+
+}
+func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) {
+ bytes := make([]byte, NeedleIdSize)
+ NeedleIdToBytes(bytes[0:NeedleIdSize], key)
+ data, err := cm.db.Get(bytes, nil)
+ if err != nil || len(data) != OffsetSize+SizeSize {
+ return nil, false
+ }
+ offset := BytesToOffset(data[0:OffsetSize])
+ size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ return &NeedleValue{Key: key, Offset: offset, Size: size}, true
+}
+
+// Visit visits all entries or stop if any error when visiting
+func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
+ iter := cm.db.NewIterator(nil, nil)
+ for iter.Next() {
+ key := BytesToNeedleId(iter.Key())
+ data := iter.Value()
+ offset := BytesToOffset(data[0:OffsetSize])
+ size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+
+ needle := NeedleValue{Key: key, Offset: offset, Size: size}
+ ret = visit(needle)
+ if ret != nil {
+ return
+ }
+ }
+ iter.Release()
+ ret = iter.Error()
+
+ return
+}
+
+func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
+ idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if err != nil {
+ return
+ }
+ defer idxFile.Close()
+
+ return cm.AscendingVisit(func(value NeedleValue) error {
+ _, err := idxFile.Write(value.ToBytes())
+ return err
+ })
+
+}
+
+func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
+ idxFile, err := os.OpenFile(idxName, os.O_RDONLY, 0644)
+ if err != nil {
+ return
+ }
+ defer idxFile.Close()
+
+ return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error {
+ if offset.IsZero() || size == TombstoneFileSize {
+ return nil
+ }
+ return cm.Set(key, offset, size)
+ })
+
+}
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index ee639a7e6..e4273f1b2 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -22,24 +22,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap {
return nm
}
-func NewBtreeNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
- m: needle_map.NewBtreeMap(),
- }
- nm.indexFile = file
- return nm
-}
-
func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewCompactNeedleMap(file)
return doLoading(file, nm)
}
-func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
- nm := NewBtreeNeedleMap(file)
- return doLoading(file, nm)
-}
-
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
nm.MaybeSetMaxFileKey(key)
diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go
deleted file mode 100644
index 539f83a87..000000000
--- a/weed/storage/needle_map_metric_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package storage
-
-import (
- "github.com/chrislusf/seaweedfs/weed/glog"
- . "github.com/chrislusf/seaweedfs/weed/storage/types"
- "io/ioutil"
- "math/rand"
- "testing"
-)
-
-func TestFastLoadingNeedleMapMetrics(t *testing.T) {
-
- idxFile, _ := ioutil.TempFile("", "tmp.idx")
- nm := NewBtreeNeedleMap(idxFile)
-
- for i := 0; i < 10000; i++ {
- nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1))
- if rand.Float32() < 0.2 {
- nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0)))
- }
- }
-
- mm, _ := newNeedleMapMetricFromIndexFile(idxFile)
-
- glog.V(0).Infof("FileCount expected %d actual %d", nm.FileCount(), mm.FileCount())
- glog.V(0).Infof("DeletedSize expected %d actual %d", nm.DeletedSize(), mm.DeletedSize())
- glog.V(0).Infof("ContentSize expected %d actual %d", nm.ContentSize(), mm.ContentSize())
- glog.V(0).Infof("DeletedCount expected %d actual %d", nm.DeletedCount(), mm.DeletedCount())
- glog.V(0).Infof("MaxFileKey expected %d actual %d", nm.MaxFileKey(), mm.MaxFileKey())
-}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index b3dcdbd9d..704f1f4ef 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -115,6 +115,7 @@ func (v *Volume) CommitCompact() error {
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
return e
}
+ return nil
}
func (v *Volume) cleanupCompact() error {
@@ -270,7 +271,7 @@ type VolumeFileScanner4Vacuum struct {
version needle.Version
v *Volume
dstBackend backend.BackendStorageFile
- nm *NeedleMap
+ nm *needle_map.MemDb
newOffset int64
now uint64
writeThrottler *util.WriteThrottler
@@ -295,7 +296,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize {
- if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
+ if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil {
@@ -312,32 +313,33 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
dst backend.BackendStorageFile
- idx *os.File
)
if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
}
defer dst.Close()
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
- return
- }
- defer idx.Close()
+ nm := needle_map.NewMemDb()
scanner := &VolumeFileScanner4Vacuum{
v: v,
now: uint64(time.Now().Unix()),
- nm: NewBtreeNeedleMap(idx),
+ nm: nm,
dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
+ if err != nil {
+ return nil
+ }
+
+ err = nm.SaveToIdx(idxName)
return
}
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
var (
- dst, idx, oldIndexFile *os.File
+ dst, oldIndexFile *os.File
)
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
@@ -345,17 +347,13 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
dstDatBackend := backend.NewDiskFile(dst)
defer dstDatBackend.Close()
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
- return
- }
- defer idx.Close()
-
if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
return
}
defer oldIndexFile.Close()
- nm := NewBtreeNeedleMap(idx)
+ nm := needle_map.NewMemDb()
+
now := uint64(time.Now().Unix())
v.SuperBlock.CompactionRevision++
@@ -384,7 +382,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if nv.Offset == offset && nv.Size > 0 {
- if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil {
+ if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
@@ -396,5 +394,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
return nil
})
+ nm.SaveToIdx(idxName)
+
return
}