aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/needle_map.go8
-rw-r--r--go/storage/needle_map_boltdb.go182
-rw-r--r--go/storage/needle_map_leveldb.go5
-rw-r--r--go/storage/store.go18
-rw-r--r--go/storage/volume.go38
-rw-r--r--go/storage/volume_vacuum.go41
6 files changed, 246 insertions, 46 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index 0bfa12180..c05aae745 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -7,6 +7,14 @@ import (
"github.com/chrislusf/weed-fs/go/util"
)
+type NeedleMapType int
+
+const (
+ NeedleMapInMemory NeedleMapType = iota
+ NeedleMapLevelDb
+ NeedleMapBoltDb
+)
+
type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *NeedleValue, ok bool)
diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go
new file mode 100644
index 000000000..bef10299a
--- /dev/null
+++ b/go/storage/needle_map_boltdb.go
@@ -0,0 +1,182 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/boltdb/bolt"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+)
+
+type BoltDbNeedleMap struct {
+ dbFileName string
+ indexFile *os.File
+ db *bolt.DB
+ mapMetric
+}
+
+var boltdbBucket = []byte("weed")
+
+func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) {
+ m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
+ if !isBoltDbFresh(dbFileName, indexFile) {
+ glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
+ generateBoltDbFile(dbFileName, indexFile)
+ glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", dbFileName)
+ if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil {
+ return
+ }
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ nm, indexLoadError := LoadNeedleMap(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = nm.mapMetric
+ return
+}
+
+func isBoltDbFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbLogFile, err := os.Open(dbFileName)
+ if err != nil {
+ return false
+ }
+ defer dbLogFile.Close()
+ dbStat, dbStatErr := dbLogFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
+ db, err := bolt.Open(dbFileName, 0644, nil)
+ if err != nil {
+ return err
+ }
+ defer db.Close()
+ return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
+ if offset > 0 {
+ boltDbWrite(db, key, offset, size)
+ } else {
+ boltDbDelete(db, key)
+ }
+ return nil
+ })
+}
+
+func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ bytes := make([]byte, 8)
+ var data []byte
+ util.Uint64toBytes(bytes, key)
+ err := m.db.View(func(tx *bolt.Tx) error {
+ bucket := tx.Bucket(boltdbBucket)
+ if bucket == nil {
+ return fmt.Errorf("Bucket %q not found!", boltdbBucket)
+ }
+
+ data = bucket.Get(bytes)
+ return nil
+ })
+
+ if err != nil || len(data) != 8 {
+ glog.V(0).Infof("Failed to get %d %v", key, err)
+ return nil, false
+ }
+ offset := util.BytesToUint32(data[0:4])
+ size := util.BytesToUint32(data[4:8])
+ return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+}
+
+func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ var oldSize uint32
+ if oldNeedle, ok := m.Get(key); ok {
+ oldSize = oldNeedle.Size
+ }
+ m.logPut(key, oldSize, size)
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil {
+ return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
+ }
+ return boltDbWrite(m.db, key, offset, size)
+}
+
+func boltDbWrite(db *bolt.DB,
+ key uint64, offset uint32, size uint32) error {
+ bytes := make([]byte, 16)
+ util.Uint64toBytes(bytes[0:8], key)
+ util.Uint32toBytes(bytes[8:12], offset)
+ util.Uint32toBytes(bytes[12:16], size)
+ return db.Update(func(tx *bolt.Tx) error {
+ bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
+ if err != nil {
+ return err
+ }
+
+ err = bucket.Put(bytes[0:8], bytes[8:16])
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+}
+func boltDbDelete(db *bolt.DB, key uint64) error {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ return db.Update(func(tx *bolt.Tx) error {
+ bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
+ if err != nil {
+ return err
+ }
+
+ err = bucket.Delete(bytes)
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+}
+
+func (m *BoltDbNeedleMap) Delete(key uint64) error {
+ if oldNeedle, ok := m.Get(key); ok {
+ m.logDelete(oldNeedle.Size)
+ }
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil {
+ return err
+ }
+ return boltDbDelete(m.db, key)
+}
+
+func (m *BoltDbNeedleMap) Close() {
+ m.db.Close()
+}
+
+func (m *BoltDbNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.dbFileName)
+}
+
+func (m *BoltDbNeedleMap) ContentSize() uint64 {
+ return m.FileByteCounter
+}
+func (m *BoltDbNeedleMap) DeletedSize() uint64 {
+ return m.DeletionByteCounter
+}
+func (m *BoltDbNeedleMap) FileCount() int {
+ return m.FileCounter
+}
+func (m *BoltDbNeedleMap) DeletedCount() int {
+ return m.DeletionCounter
+}
+func (m *BoltDbNeedleMap) MaxFileKey() uint64 {
+ return m.MaximumFileKey
+}
diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go
index 73595278d..32f763b4f 100644
--- a/go/storage/needle_map_leveldb.go
+++ b/go/storage/needle_map_leveldb.go
@@ -21,7 +21,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl
m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
if !isLevelDbFresh(dbFileName, indexFile) {
glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
- generateDbFile(dbFileName, indexFile)
+ generateLevelDbFile(dbFileName, indexFile)
glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
}
glog.V(1).Infof("Opening %s...", dbFileName)
@@ -54,7 +54,7 @@ func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
return dbStat.ModTime().After(indexStat.ModTime())
}
-func generateDbFile(dbFileName string, indexFile *os.File) error {
+func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
db, err := leveldb.OpenFile(dbFileName, nil)
if err != nil {
return err
@@ -75,7 +75,6 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
util.Uint64toBytes(bytes, key)
data, err := m.db.Get(bytes, nil)
if err != nil || len(data) != 8 {
- glog.V(0).Infof("Failed to get %d %v", key, err)
return nil, false
}
offset := util.BytesToUint32(data[0:4])
diff --git a/go/storage/store.go b/go/storage/store.go
index 65e5b218b..eb44bd9d0 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -90,18 +90,18 @@ func (s *Store) String() (str string) {
return
}
-func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) {
+func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
location.volumes = make(map[VolumeId]*Volume)
- location.loadExistingVolumes(useLevelDb)
+ location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl)
+ e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
e = err
}
}
@@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
+func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil {
+ if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -195,7 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, repl
return fmt.Errorf("No more free space left")
}
-func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) {
+func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
@@ -208,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 2b47fb497..e35eeee49 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -14,12 +14,13 @@ import (
)
type Volume struct {
- Id VolumeId
- dir string
- Collection string
- dataFile *os.File
- nm NeedleMapper
- readOnly bool
+ Id VolumeId
+ dir string
+ Collection string
+ dataFile *os.File
+ nm NeedleMapper
+ needleMapKind NeedleMapType
+ readOnly bool
SuperBlock
@@ -27,20 +28,22 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
- e = v.load(true, true, useLevelDb)
+ v.needleMapKind = needleMapKind
+ e = v.load(true, true, needleMapKind)
return
}
func (v *Volume) String() string {
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
}
-func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
+func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
- e = v.load(false, false, false)
+ v.needleMapKind = needleMapKind
+ e = v.load(false, false, needleMapKind)
return
}
func (v *Volume) FileName() (fileName string) {
@@ -51,7 +54,7 @@ func (v *Volume) FileName() (fileName string) {
}
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error {
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
var e error
fileName := v.FileName()
@@ -99,16 +102,22 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bo
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
}
}
- if !useLevelDb {
+ switch needleMapKind {
+ case NeedleMapInMemory:
glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
if v.nm, e = LoadNeedleMap(indexFile); e != nil {
glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
}
- } else {
+ case NeedleMapLevelDb:
glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
}
+ case NeedleMapBoltDb:
+ glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
+ if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
+ glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
+ }
}
}
return e
@@ -263,11 +272,12 @@ func (v *Volume) read(n *Needle) (int, error) {
}
func ScanVolumeFile(dirname string, collection string, id VolumeId,
+ needleMapKind NeedleMapType,
visitSuperBlock func(SuperBlock) error,
readNeedleBody bool,
visitNeedle func(n *Needle, offset int64) error) (err error) {
var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
return fmt.Errorf("Failed to load volume %d: %v", id, err)
}
if err = visitSuperBlock(v.SuperBlock); err != nil {
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 9f6f8e35f..eab138000 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error {
}
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
- if e = v.load(true, false, false); e != nil {
+ if e = v.load(true, false, v.needleMapKind); e != nil {
return e
}
return nil
@@ -63,27 +63,28 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
now := uint64(time.Now().Unix())
- err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
- _, err = dst.Write(superBlock.Bytes())
- return err
- }, true, func(n *Needle, offset int64) error {
- if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
- return nil
- }
- nv, ok := v.nm.Get(n.Id)
- glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
- if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
- return fmt.Errorf("cannot put needle: %s", err)
+ err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind,
+ func(superBlock SuperBlock) error {
+ _, err = dst.Write(superBlock.Bytes())
+ return err
+ }, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
}
- if _, err = n.Append(dst, v.Version()); err != nil {
- return fmt.Errorf("cannot append needle: %s", err)
+ nv, ok := v.nm.Get(n.Id)
+ glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
+ if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
+ if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
+ return fmt.Errorf("cannot put needle: %s", err)
+ }
+ if _, err = n.Append(dst, v.Version()); err != nil {
+ return fmt.Errorf("cannot append needle: %s", err)
+ }
+ new_offset += n.DiskSize()
+ glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
}
- new_offset += n.DiskSize()
- glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
- }
- return nil
- })
+ return nil
+ })
return
}