aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/needle_map.go158
-rw-r--r--go/storage/needle_map_leveldb.go151
-rw-r--r--go/storage/needle_map_memory.go126
-rw-r--r--go/storage/store.go40
-rw-r--r--go/storage/volume.go83
-rw-r--r--go/storage/volume_vacuum.go4
6 files changed, 330 insertions, 232 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index f74191742..0bfa12180 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -2,15 +2,13 @@ package storage
import (
"fmt"
- "io"
"os"
- "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
)
type NeedleMapper interface {
- Put(key uint64, offset uint32, size uint32) (int, error)
+ Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *NeedleValue, ok bool)
Delete(key uint64) error
Close()
@@ -19,7 +17,6 @@ type NeedleMapper interface {
DeletedSize() uint64
FileCount() int
DeletedCount() int
- Visit(visit func(NeedleValue) error) (err error)
MaxFileKey() uint64
}
@@ -31,146 +28,33 @@ type mapMetric struct {
MaximumFileKey uint64 `json:"MaxFileKey"`
}
-type NeedleMap struct {
- indexFile *os.File
- m CompactMap
-
- mapMetric
-}
-
-func NewNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
- m: NewCompactMap(),
- indexFile: file,
- }
- return nm
-}
-
-const (
- RowsToRead = 1024
-)
-
-func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
- nm := NewNeedleMap(file)
- e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
- if key > nm.MaximumFileKey {
- nm.MaximumFileKey = key
- }
- nm.FileCounter++
- nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- } else {
- oldSize := nm.m.Delete(Key(key))
- glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- return nil
- })
- glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
- return nm, e
-}
-
-// walks through the index file, calls fn function with each key, offset, size
-// stops with the error returned by the fn function
-func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
- var readerOffset int64
- bytes := make([]byte, 16*RowsToRead)
- count, e := r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
- readerOffset += int64(count)
- var (
- key uint64
- offset, size uint32
- i int
- )
-
- for count > 0 && e == nil || e == io.EOF {
- for i = 0; i+16 <= count; i += 16 {
- key = util.BytesToUint64(bytes[i : i+8])
- offset = util.BytesToUint32(bytes[i+8 : i+12])
- size = util.BytesToUint32(bytes[i+12 : i+16])
- if e = fn(key, offset, size); e != nil {
- return e
- }
- }
- if e == io.EOF {
- return nil
- }
- count, e = r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
- readerOffset += int64(count)
- }
- return e
-}
-
-func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
- if key > nm.MaximumFileKey {
- nm.MaximumFileKey = key
- }
- oldSize := nm.m.Set(Key(key), offset, size)
+func appendToIndexFile(indexFile *os.File,
+ key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
- nm.FileCounter++
- nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- if oldSize > 0 {
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- if _, err := nm.indexFile.Seek(0, 2); err != nil {
- return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err)
+ if _, err := indexFile.Seek(0, 2); err != nil {
+ return fmt.Errorf("cannot seek end of indexfile %s: %v",
+ indexFile.Name(), err)
}
- return nm.indexFile.Write(bytes)
+ _, err := indexFile.Write(bytes)
+ return err
}
-func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- element, ok = nm.m.Get(Key(key))
- return
+
+func (mm *mapMetric) logDelete(deletedByteCount uint32) {
+ mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
+ mm.DeletionCounter++
}
-func (nm *NeedleMap) Delete(key uint64) error {
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key)))
- bytes := make([]byte, 16)
- util.Uint64toBytes(bytes[0:8], key)
- util.Uint32toBytes(bytes[8:12], 0)
- util.Uint32toBytes(bytes[12:16], 0)
- if _, err := nm.indexFile.Seek(0, 2); err != nil {
- return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err)
+
+func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
+ if key > mm.MaximumFileKey {
+ mm.MaximumFileKey = key
}
- if _, err := nm.indexFile.Write(bytes); err != nil {
- return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err)
+ mm.FileCounter++
+ mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
+ if oldSize > 0 {
+ mm.DeletionCounter++
+ mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
}
- nm.DeletionCounter++
- return nil
-}
-func (nm *NeedleMap) Close() {
- _ = nm.indexFile.Close()
-}
-func (nm *NeedleMap) Destroy() error {
- nm.Close()
- return os.Remove(nm.indexFile.Name())
-}
-func (nm NeedleMap) ContentSize() uint64 {
- return nm.FileByteCounter
-}
-func (nm NeedleMap) DeletedSize() uint64 {
- return nm.DeletionByteCounter
-}
-func (nm NeedleMap) FileCount() int {
- return nm.FileCounter
-}
-func (nm NeedleMap) DeletedCount() int {
- return nm.DeletionCounter
-}
-func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
- return nm.m.Visit(visit)
-}
-func (nm NeedleMap) MaxFileKey() uint64 {
- return nm.MaximumFileKey
}
diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go
new file mode 100644
index 000000000..73595278d
--- /dev/null
+++ b/go/storage/needle_map_leveldb.go
@@ -0,0 +1,151 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+type LevelDbNeedleMap struct {
+ dbFileName string
+ indexFile *os.File
+ db *leveldb.DB
+ mapMetric
+}
+
+func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) {
+ m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
+ if !isLevelDbFresh(dbFileName, indexFile) {
+ glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
+ generateDbFile(dbFileName, indexFile)
+ glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", dbFileName)
+ if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil {
+ return
+ }
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ nm, indexLoadError := LoadNeedleMap(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = nm.mapMetric
+ return
+}
+
+func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
+ if err != nil {
+ return false
+ }
+ defer dbLogFile.Close()
+ dbStat, dbStatErr := dbLogFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func generateDbFile(dbFileName string, indexFile *os.File) error {
+ db, err := leveldb.OpenFile(dbFileName, nil)
+ if err != nil {
+ return err
+ }
+ defer db.Close()
+ return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
+ if offset > 0 {
+ levelDbWrite(db, key, offset, size)
+ } else {
+ levelDbDelete(db, key)
+ }
+ return nil
+ })
+}
+
+func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ data, err := m.db.Get(bytes, nil)
+ if err != nil || len(data) != 8 {
+ glog.V(0).Infof("Failed to get %d %v", key, err)
+ return nil, false
+ }
+ offset := util.BytesToUint32(data[0:4])
+ size := util.BytesToUint32(data[4:8])
+ return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+}
+
+func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ var oldSize uint32
+ if oldNeedle, ok := m.Get(key); ok {
+ oldSize = oldNeedle.Size
+ }
+ m.logPut(key, oldSize, size)
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil {
+ return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
+ }
+ return levelDbWrite(m.db, key, offset, size)
+}
+
+func levelDbWrite(db *leveldb.DB,
+ key uint64, offset uint32, size uint32) error {
+ bytes := make([]byte, 16)
+ util.Uint64toBytes(bytes[0:8], key)
+ util.Uint32toBytes(bytes[8:12], offset)
+ util.Uint32toBytes(bytes[12:16], size)
+ if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil {
+ return fmt.Errorf("failed to write leveldb: %v", err)
+ }
+ return nil
+}
+func levelDbDelete(db *leveldb.DB, key uint64) error {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ return db.Delete(bytes, nil)
+}
+
+func (m *LevelDbNeedleMap) Delete(key uint64) error {
+ if oldNeedle, ok := m.Get(key); ok {
+ m.logDelete(oldNeedle.Size)
+ }
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil {
+ return err
+ }
+ return levelDbDelete(m.db, key)
+}
+
+func (m *LevelDbNeedleMap) Close() {
+ m.db.Close()
+}
+
+func (m *LevelDbNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.dbFileName)
+}
+
+func (m *LevelDbNeedleMap) ContentSize() uint64 {
+ return m.FileByteCounter
+}
+func (m *LevelDbNeedleMap) DeletedSize() uint64 {
+ return m.DeletionByteCounter
+}
+func (m *LevelDbNeedleMap) FileCount() int {
+ return m.FileCounter
+}
+func (m *LevelDbNeedleMap) DeletedCount() int {
+ return m.DeletionCounter
+}
+func (m *LevelDbNeedleMap) MaxFileKey() uint64 {
+ return m.MaximumFileKey
+}
diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go
new file mode 100644
index 000000000..5fce301bc
--- /dev/null
+++ b/go/storage/needle_map_memory.go
@@ -0,0 +1,126 @@
+package storage
+
+import (
+ "io"
+ "os"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+)
+
+type NeedleMap struct {
+ indexFile *os.File
+ m CompactMap
+
+ mapMetric
+}
+
+func NewNeedleMap(file *os.File) *NeedleMap {
+ nm := &NeedleMap{
+ m: NewCompactMap(),
+ indexFile: file,
+ }
+ return nm
+}
+
+const (
+ RowsToRead = 1024
+)
+
+func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
+ nm := NewNeedleMap(file)
+ e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
+ if key > nm.MaximumFileKey {
+ nm.MaximumFileKey = key
+ }
+ nm.FileCounter++
+ nm.FileByteCounter = nm.FileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
+ }
+ } else {
+ oldSize := nm.m.Delete(Key(key))
+ glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
+ }
+ return nil
+ })
+ glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
+ return nm, e
+}
+
+// walks through the index file, calls fn function with each key, offset, size
+// stops with the error returned by the fn function
+func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
+ var readerOffset int64
+ bytes := make([]byte, 16*RowsToRead)
+ count, e := r.ReadAt(bytes, readerOffset)
+ glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ readerOffset += int64(count)
+ var (
+ key uint64
+ offset, size uint32
+ i int
+ )
+
+ for count > 0 && e == nil || e == io.EOF {
+ for i = 0; i+16 <= count; i += 16 {
+ key = util.BytesToUint64(bytes[i : i+8])
+ offset = util.BytesToUint32(bytes[i+8 : i+12])
+ size = util.BytesToUint32(bytes[i+12 : i+16])
+ if e = fn(key, offset, size); e != nil {
+ return e
+ }
+ }
+ if e == io.EOF {
+ return nil
+ }
+ count, e = r.ReadAt(bytes, readerOffset)
+ glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ readerOffset += int64(count)
+ }
+ return e
+}
+
+func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ nm.logPut(key, oldSize, size)
+ return appendToIndexFile(nm.indexFile, key, offset, size)
+}
+func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ element, ok = nm.m.Get(Key(key))
+ return
+}
+func (nm *NeedleMap) Delete(key uint64) error {
+ deletedBytes := nm.m.Delete(Key(key))
+ nm.logDelete(deletedBytes)
+ return appendToIndexFile(nm.indexFile, key, 0, 0)
+}
+func (nm *NeedleMap) Close() {
+ _ = nm.indexFile.Close()
+}
+func (nm *NeedleMap) Destroy() error {
+ nm.Close()
+ return os.Remove(nm.indexFile.Name())
+}
+func (nm NeedleMap) ContentSize() uint64 {
+ return nm.FileByteCounter
+}
+func (nm NeedleMap) DeletedSize() uint64 {
+ return nm.DeletionByteCounter
+}
+func (nm NeedleMap) FileCount() int {
+ return nm.FileCounter
+}
+func (nm NeedleMap) DeletedCount() int {
+ return nm.DeletionCounter
+}
+
+func (nm NeedleMap) MaxFileKey() uint64 {
+ return nm.MaximumFileKey
+}
diff --git a/go/storage/store.go b/go/storage/store.go
index 2695537f6..65e5b218b 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -90,18 +90,18 @@ func (s *Store) String() (str string) {
return
}
-func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
+func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
location.volumes = make(map[VolumeId]*Volume)
- location.loadExistingVolumes()
+ location.loadExistingVolumes(useLevelDb)
s.Locations = append(s.Locations, location)
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, rt, ttl)
+ e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil {
e = err
}
}
@@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
+func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
+ if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -195,20 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *Rep
return fmt.Errorf("No more free space left")
}
-func (s *Store) FreezeVolume(volumeIdString string) error {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
- }
- if v := s.findVolume(vid); v != nil {
- if v.readOnly {
- return fmt.Errorf("Volume %s is already read-only", volumeIdString)
- }
- return v.freeze()
- }
- return fmt.Errorf("volume id %d is not found during freeze", vid)
-}
-func (l *DiskLocation) loadExistingVolumes() {
+func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
@@ -221,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
@@ -261,7 +248,7 @@ func (s *Store) SetRack(rack string) {
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster)
}
-func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
+func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.findMaster()
if e != nil {
return
@@ -317,13 +304,16 @@ func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
return "", "", err
}
- jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
+ joinUrl := "http://" + masterNode + "/dir/join"
+
+ jsonBlob, err := util.PostBytes(joinUrl, data)
if err != nil {
s.masterNodes.reset()
return "", "", err
}
var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
return masterNode, "", err
}
if ret.Error != "" {
@@ -354,7 +344,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
}
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
- if _, _, e := s.Join(); e != nil {
+ if _, _, e := s.SendHeartbeatToMaster(); e != nil {
glog.V(0).Infoln("error when reporting size:", e)
}
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 1988c9aac..2b47fb497 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -27,10 +27,10 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
- e = v.load(true, true)
+ e = v.load(true, true, useLevelDb)
return
}
func (v *Volume) String() string {
@@ -40,7 +40,7 @@ func (v *Volume) String() string {
func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
- e = v.load(false, false)
+ e = v.load(false, false, false)
return
}
func (v *Volume) FileName() (fileName string) {
@@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) {
}
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error {
var e error
fileName := v.FileName()
@@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
e = v.maybeWriteSuperBlock()
}
if e == nil && alsoLoadIndex {
- if v.readOnly {
- if v.ensureConvertIdxToCdb(fileName) {
- v.nm, e = OpenCdbMap(fileName + ".cdb")
- return e
- }
- }
var indexFile *os.File
if v.readOnly {
glog.V(1).Infoln("open to read file", fileName+".idx")
@@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
}
}
- glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
- if v.nm, e = LoadNeedleMap(indexFile); e != nil {
- glog.V(0).Infoln("loading error:", e)
+ if !useLevelDb {
+ glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
+ if v.nm, e = LoadNeedleMap(indexFile); e != nil {
+ glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
+ }
+ } else {
+ glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
+ if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ }
}
}
return e
@@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
}
nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
- if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
+ if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
}
@@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) {
return -1, errors.New("Not Found")
}
-func (v *Volume) freeze() error {
- if v.readOnly {
- return nil
- }
- nm, ok := v.nm.(*NeedleMap)
- if !ok {
- return nil
- }
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- bn, _ := baseFilename(v.dataFile.Name())
- cdbFn := bn + ".cdb"
- glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
- err := DumpNeedleMapToCdb(cdbFn, nm)
- if err != nil {
- return err
- }
- if v.nm, err = OpenCdbMap(cdbFn); err != nil {
- return err
- }
- nm.indexFile.Close()
- os.Remove(nm.indexFile.Name())
- v.readOnly = true
- return nil
-}
-
func ScanVolumeFile(dirname string, collection string, id VolumeId,
visitSuperBlock func(SuperBlock) error,
readNeedleBody bool,
@@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti
modTime = fi.ModTime()
return
}
-func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
- var indexFile *os.File
- var e error
- _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
- _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
- if cdbCanRead && cdbModTime.After(idxModeTime) {
- return true
- }
- if !cdbCanWrite {
- return false
- }
- if !idxCanRead {
- glog.V(0).Infoln("Can not read file", fileName+".idx!")
- return false
- }
- glog.V(2).Infoln("opening file", fileName+".idx")
- if indexFile, e = os.Open(fileName + ".idx"); e != nil {
- glog.V(0).Infoln("Failed to read file", fileName+".idx !")
- return false
- }
- defer indexFile.Close()
- glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
- if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
- glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e)
- return false
- }
- return true
-}
// volume is expired if modified time + volume ttl < now
// except when volume is empty
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 7e026a61d..9f6f8e35f 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error {
}
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
- if e = v.load(true, false); e != nil {
+ if e = v.load(true, false, false); e != nil {
return e
}
return nil
@@ -73,7 +73,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
- if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
+ if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, err = n.Append(dst, v.Version()); err != nil {