aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/storage/needle_map.go8
-rw-r--r--go/storage/needle_map_boltdb.go182
-rw-r--r--go/storage/needle_map_leveldb.go5
-rw-r--r--go/storage/store.go18
-rw-r--r--go/storage/volume.go38
-rw-r--r--go/storage/volume_vacuum.go41
-rw-r--r--go/weed/compact.go3
-rw-r--r--go/weed/export.go36
-rw-r--r--go/weed/fix.go28
-rw-r--r--go/weed/server.go12
-rw-r--r--go/weed/volume.go14
-rw-r--r--go/weed/weed_server/volume_server.go10
-rw-r--r--go/weed/weed_server/volume_server_handlers_admin.go2
13 files changed, 309 insertions, 88 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index 0bfa12180..c05aae745 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -7,6 +7,14 @@ import (
"github.com/chrislusf/weed-fs/go/util"
)
+type NeedleMapType int
+
+const (
+ NeedleMapInMemory NeedleMapType = iota
+ NeedleMapLevelDb
+ NeedleMapBoltDb
+)
+
type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *NeedleValue, ok bool)
diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go
new file mode 100644
index 000000000..bef10299a
--- /dev/null
+++ b/go/storage/needle_map_boltdb.go
@@ -0,0 +1,182 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/boltdb/bolt"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+)
+
+type BoltDbNeedleMap struct {
+ dbFileName string
+ indexFile *os.File
+ db *bolt.DB
+ mapMetric
+}
+
+var boltdbBucket = []byte("weed")
+
+func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) {
+ m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
+ if !isBoltDbFresh(dbFileName, indexFile) {
+ glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
+ generateBoltDbFile(dbFileName, indexFile)
+ glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", dbFileName)
+ if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil {
+ return
+ }
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ nm, indexLoadError := LoadNeedleMap(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = nm.mapMetric
+ return
+}
+
+func isBoltDbFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbLogFile, err := os.Open(dbFileName)
+ if err != nil {
+ return false
+ }
+ defer dbLogFile.Close()
+ dbStat, dbStatErr := dbLogFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
+ db, err := bolt.Open(dbFileName, 0644, nil)
+ if err != nil {
+ return err
+ }
+ defer db.Close()
+ return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
+ if offset > 0 {
+ boltDbWrite(db, key, offset, size)
+ } else {
+ boltDbDelete(db, key)
+ }
+ return nil
+ })
+}
+
+func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ bytes := make([]byte, 8)
+ var data []byte
+ util.Uint64toBytes(bytes, key)
+ err := m.db.View(func(tx *bolt.Tx) error {
+ bucket := tx.Bucket(boltdbBucket)
+ if bucket == nil {
+ return fmt.Errorf("Bucket %q not found!", boltdbBucket)
+ }
+
+ data = bucket.Get(bytes)
+ return nil
+ })
+
+ if err != nil || len(data) != 8 {
+ glog.V(0).Infof("Failed to get %d %v", key, err)
+ return nil, false
+ }
+ offset := util.BytesToUint32(data[0:4])
+ size := util.BytesToUint32(data[4:8])
+ return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+}
+
+func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ var oldSize uint32
+ if oldNeedle, ok := m.Get(key); ok {
+ oldSize = oldNeedle.Size
+ }
+ m.logPut(key, oldSize, size)
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil {
+ return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
+ }
+ return boltDbWrite(m.db, key, offset, size)
+}
+
+func boltDbWrite(db *bolt.DB,
+ key uint64, offset uint32, size uint32) error {
+ bytes := make([]byte, 16)
+ util.Uint64toBytes(bytes[0:8], key)
+ util.Uint32toBytes(bytes[8:12], offset)
+ util.Uint32toBytes(bytes[12:16], size)
+ return db.Update(func(tx *bolt.Tx) error {
+ bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
+ if err != nil {
+ return err
+ }
+
+ err = bucket.Put(bytes[0:8], bytes[8:16])
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+}
+func boltDbDelete(db *bolt.DB, key uint64) error {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ return db.Update(func(tx *bolt.Tx) error {
+ bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
+ if err != nil {
+ return err
+ }
+
+ err = bucket.Delete(bytes)
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+}
+
+func (m *BoltDbNeedleMap) Delete(key uint64) error {
+ if oldNeedle, ok := m.Get(key); ok {
+ m.logDelete(oldNeedle.Size)
+ }
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil {
+ return err
+ }
+ return boltDbDelete(m.db, key)
+}
+
+func (m *BoltDbNeedleMap) Close() {
+ m.db.Close()
+}
+
+func (m *BoltDbNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.dbFileName)
+}
+
+func (m *BoltDbNeedleMap) ContentSize() uint64 {
+ return m.FileByteCounter
+}
+func (m *BoltDbNeedleMap) DeletedSize() uint64 {
+ return m.DeletionByteCounter
+}
+func (m *BoltDbNeedleMap) FileCount() int {
+ return m.FileCounter
+}
+func (m *BoltDbNeedleMap) DeletedCount() int {
+ return m.DeletionCounter
+}
+func (m *BoltDbNeedleMap) MaxFileKey() uint64 {
+ return m.MaximumFileKey
+}
diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go
index 73595278d..32f763b4f 100644
--- a/go/storage/needle_map_leveldb.go
+++ b/go/storage/needle_map_leveldb.go
@@ -21,7 +21,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl
m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
if !isLevelDbFresh(dbFileName, indexFile) {
glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
- generateDbFile(dbFileName, indexFile)
+ generateLevelDbFile(dbFileName, indexFile)
glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
}
glog.V(1).Infof("Opening %s...", dbFileName)
@@ -54,7 +54,7 @@ func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
return dbStat.ModTime().After(indexStat.ModTime())
}
-func generateDbFile(dbFileName string, indexFile *os.File) error {
+func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
db, err := leveldb.OpenFile(dbFileName, nil)
if err != nil {
return err
@@ -75,7 +75,6 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
util.Uint64toBytes(bytes, key)
data, err := m.db.Get(bytes, nil)
if err != nil || len(data) != 8 {
- glog.V(0).Infof("Failed to get %d %v", key, err)
return nil, false
}
offset := util.BytesToUint32(data[0:4])
diff --git a/go/storage/store.go b/go/storage/store.go
index 65e5b218b..eb44bd9d0 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -90,18 +90,18 @@ func (s *Store) String() (str string) {
return
}
-func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) {
+func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
location.volumes = make(map[VolumeId]*Volume)
- location.loadExistingVolumes(useLevelDb)
+ location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl)
+ e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
e = err
}
}
@@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
+func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil {
+ if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -195,7 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, repl
return fmt.Errorf("No more free space left")
}
-func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) {
+func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
@@ -208,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 2b47fb497..e35eeee49 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -14,12 +14,13 @@ import (
)
type Volume struct {
- Id VolumeId
- dir string
- Collection string
- dataFile *os.File
- nm NeedleMapper
- readOnly bool
+ Id VolumeId
+ dir string
+ Collection string
+ dataFile *os.File
+ nm NeedleMapper
+ needleMapKind NeedleMapType
+ readOnly bool
SuperBlock
@@ -27,20 +28,22 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
- e = v.load(true, true, useLevelDb)
+ v.needleMapKind = needleMapKind
+ e = v.load(true, true, needleMapKind)
return
}
func (v *Volume) String() string {
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
}
-func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
+func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
- e = v.load(false, false, false)
+ v.needleMapKind = needleMapKind
+ e = v.load(false, false, needleMapKind)
return
}
func (v *Volume) FileName() (fileName string) {
@@ -51,7 +54,7 @@ func (v *Volume) FileName() (fileName string) {
}
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error {
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
var e error
fileName := v.FileName()
@@ -99,16 +102,22 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bo
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
}
}
- if !useLevelDb {
+ switch needleMapKind {
+ case NeedleMapInMemory:
glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
if v.nm, e = LoadNeedleMap(indexFile); e != nil {
glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
}
- } else {
+ case NeedleMapLevelDb:
glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
}
+ case NeedleMapBoltDb:
+ glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
+ if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
+ glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
+ }
}
}
return e
@@ -263,11 +272,12 @@ func (v *Volume) read(n *Needle) (int, error) {
}
func ScanVolumeFile(dirname string, collection string, id VolumeId,
+ needleMapKind NeedleMapType,
visitSuperBlock func(SuperBlock) error,
readNeedleBody bool,
visitNeedle func(n *Needle, offset int64) error) (err error) {
var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
return fmt.Errorf("Failed to load volume %d: %v", id, err)
}
if err = visitSuperBlock(v.SuperBlock); err != nil {
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 9f6f8e35f..eab138000 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error {
}
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
- if e = v.load(true, false, false); e != nil {
+ if e = v.load(true, false, v.needleMapKind); e != nil {
return e
}
return nil
@@ -63,27 +63,28 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
now := uint64(time.Now().Unix())
- err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
- _, err = dst.Write(superBlock.Bytes())
- return err
- }, true, func(n *Needle, offset int64) error {
- if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
- return nil
- }
- nv, ok := v.nm.Get(n.Id)
- glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
- if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
- return fmt.Errorf("cannot put needle: %s", err)
+ err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind,
+ func(superBlock SuperBlock) error {
+ _, err = dst.Write(superBlock.Bytes())
+ return err
+ }, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
}
- if _, err = n.Append(dst, v.Version()); err != nil {
- return fmt.Errorf("cannot append needle: %s", err)
+ nv, ok := v.nm.Get(n.Id)
+ glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
+ if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
+ if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
+ return fmt.Errorf("cannot put needle: %s", err)
+ }
+ if _, err = n.Append(dst, v.Version()); err != nil {
+ return fmt.Errorf("cannot append needle: %s", err)
+ }
+ new_offset += n.DiskSize()
+ glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
}
- new_offset += n.DiskSize()
- glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
- }
- return nil
- })
+ return nil
+ })
return
}
diff --git a/go/weed/compact.go b/go/weed/compact.go
index 6ce55a609..a5ea8a529 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -33,7 +33,8 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, false, nil, nil)
+ v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
+ storage.NeedleMapInMemory, nil, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/go/weed/export.go b/go/weed/export.go
index 9e7012937..b120c2fa7 100644
--- a/go/weed/export.go
+++ b/go/weed/export.go
@@ -99,23 +99,25 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version
- err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error {
- version = superBlock.Version()
- return nil
- }, true, func(n *storage.Needle, offset int64) error {
- nv, ok := nm.Get(n.Id)
- glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v",
- n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped(), ok, nv)
- if ok && nv.Size > 0 && int64(nv.Offset)*8 == offset {
- return walker(vid, n, version)
- }
- if !ok {
- glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size)
- } else {
- glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size)
- }
- return nil
- })
+ err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid,
+ storage.NeedleMapInMemory,
+ func(superBlock storage.SuperBlock) error {
+ version = superBlock.Version()
+ return nil
+ }, true, func(n *storage.Needle, offset int64) error {
+ nv, ok := nm.Get(n.Id)
+ glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v",
+ n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped(), ok, nv)
+ if ok && nv.Size > 0 && int64(nv.Offset)*8 == offset {
+ return walker(vid, n, version)
+ }
+ if !ok {
+ glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size)
+ } else {
+ glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size)
+ }
+ return nil
+ })
if err != nil {
glog.Fatalf("Export Volume File [ERROR] %s\n", err)
}
diff --git a/go/weed/fix.go b/go/weed/fix.go
index d2cd40398..f51dc1bf2 100644
--- a/go/weed/fix.go
+++ b/go/weed/fix.go
@@ -47,19 +47,21 @@ func runFix(cmd *Command, args []string) bool {
defer nm.Close()
vid := storage.VolumeId(*fixVolumeId)
- err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, func(superBlock storage.SuperBlock) error {
- return nil
- }, false, func(n *storage.Needle, offset int64) error {
- glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped())
- if n.Size > 0 {
- pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size)
- glog.V(2).Infof("saved %d with error %v", n.Size, pe)
- } else {
- glog.V(2).Infof("skipping deleted file ...")
- return nm.Delete(n.Id)
- }
- return nil
- })
+ err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid,
+ storage.NeedleMapInMemory,
+ func(superBlock storage.SuperBlock) error {
+ return nil
+ }, false, func(n *storage.Needle, offset int64) error {
+ glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped())
+ if n.Size > 0 {
+ pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size)
+ glog.V(2).Infof("saved %d with error %v", n.Size, pe)
+ } else {
+ glog.V(2).Infof("skipping deleted file ...")
+ return nm.Delete(n.Id)
+ }
+ return nil
+ })
if err != nil {
glog.Fatalf("Export Volume File [ERROR] %s\n", err)
}
diff --git a/go/weed/server.go b/go/weed/server.go
index 71346de0a..39d02597b 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -11,6 +11,7 @@ import (
"time"
"github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
@@ -68,7 +69,7 @@ var (
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- volumeUseLevelDb = cmdServer.Flag.Bool("volume.leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.")
+ volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
@@ -233,10 +234,17 @@ func runServer(cmd *Command, args []string) bool {
if isSeperatedPublicPort {
publicVolumeMux = http.NewServeMux()
}
+ volumeNeedleMapKind := storage.NeedleMapInMemory
+ switch *volumeIndexType {
+ case "leveldb":
+ volumeNeedleMapKind = storage.NeedleMapLevelDb
+ case "boltdb":
+ volumeNeedleMapKind = storage.NeedleMapBoltDb
+ }
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*serverIp, *volumePort, *serverPublicUrl,
folders, maxCounts,
- *volumeUseLevelDb,
+ volumeNeedleMapKind,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation,
)
diff --git a/go/weed/volume.go b/go/weed/volume.go
index e2c6ebd94..2d3ecbb4d 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
)
@@ -32,7 +33,7 @@ type VolumeServerOptions struct {
dataCenter *string
rack *string
whiteList []string
- useLevelDb *bool
+ indexType *string
fixJpgOrientation *bool
}
@@ -49,7 +50,7 @@ func init() {
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
- v.useLevelDb = cmdVolume.Flag.Bool("leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.")
+ v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
}
@@ -115,10 +116,17 @@ func runVolume(cmd *Command, args []string) bool {
publicVolumeMux = http.NewServeMux()
}
+ volumeNeedleMapKind := storage.NeedleMapInMemory
+ switch *v.indexType {
+ case "leveldb":
+ volumeNeedleMapKind = storage.NeedleMapLevelDb
+ case "boltdb":
+ volumeNeedleMapKind = storage.NeedleMapBoltDb
+ }
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits,
- *v.useLevelDb,
+ volumeNeedleMapKind,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation,
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index d84b39808..69c944c99 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -20,14 +20,14 @@ type VolumeServer struct {
store *storage.Store
guard *security.Guard
- UseLevelDb bool
+ needleMapKind storage.NeedleMapType
FixJpgOrientation bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
folders []string, maxCounts []int,
- useLevelDb bool,
+ needleMapKind storage.NeedleMapType,
masterNode string, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
@@ -36,11 +36,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
- UseLevelDb: useLevelDb,
+ needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
}
vs.SetMasterNode(masterNode)
- vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.UseLevelDb)
+ vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, "")
@@ -77,7 +77,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
glog.V(0).Infoln("Volume Server Connected with master at", master)
}
} else {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %+v: %v", vs, err)
if connected {
connected = false
}
diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go
index 0d70a757e..eb8222ff8 100644
--- a/go/weed/weed_server/volume_server_handlers_admin.go
+++ b/go/weed/weed_server/volume_server_handlers_admin.go
@@ -17,7 +17,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
- err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.UseLevelDb, r.FormValue("replication"), r.FormValue("ttl"))
+ err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {