aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/storage/needle_map.go158
-rw-r--r--go/storage/needle_map_leveldb.go151
-rw-r--r--go/storage/needle_map_memory.go126
-rw-r--r--go/storage/store.go40
-rw-r--r--go/storage/volume.go83
-rw-r--r--go/storage/volume_vacuum.go4
-rw-r--r--go/util/http_util.go4
-rw-r--r--go/weed/compact.go2
-rw-r--r--go/weed/fix.go4
-rw-r--r--go/weed/server.go2
-rw-r--r--go/weed/volume.go3
-rw-r--r--go/weed/weed_server/volume_server.go10
-rw-r--r--go/weed/weed_server/volume_server_handlers_admin.go13
13 files changed, 347 insertions, 253 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index f74191742..0bfa12180 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -2,15 +2,13 @@ package storage
import (
"fmt"
- "io"
"os"
- "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
)
type NeedleMapper interface {
- Put(key uint64, offset uint32, size uint32) (int, error)
+ Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *NeedleValue, ok bool)
Delete(key uint64) error
Close()
@@ -19,7 +17,6 @@ type NeedleMapper interface {
DeletedSize() uint64
FileCount() int
DeletedCount() int
- Visit(visit func(NeedleValue) error) (err error)
MaxFileKey() uint64
}
@@ -31,146 +28,33 @@ type mapMetric struct {
MaximumFileKey uint64 `json:"MaxFileKey"`
}
-type NeedleMap struct {
- indexFile *os.File
- m CompactMap
-
- mapMetric
-}
-
-func NewNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
- m: NewCompactMap(),
- indexFile: file,
- }
- return nm
-}
-
-const (
- RowsToRead = 1024
-)
-
-func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
- nm := NewNeedleMap(file)
- e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
- if key > nm.MaximumFileKey {
- nm.MaximumFileKey = key
- }
- nm.FileCounter++
- nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- } else {
- oldSize := nm.m.Delete(Key(key))
- glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- return nil
- })
- glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
- return nm, e
-}
-
-// walks through the index file, calls fn function with each key, offset, size
-// stops with the error returned by the fn function
-func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
- var readerOffset int64
- bytes := make([]byte, 16*RowsToRead)
- count, e := r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
- readerOffset += int64(count)
- var (
- key uint64
- offset, size uint32
- i int
- )
-
- for count > 0 && e == nil || e == io.EOF {
- for i = 0; i+16 <= count; i += 16 {
- key = util.BytesToUint64(bytes[i : i+8])
- offset = util.BytesToUint32(bytes[i+8 : i+12])
- size = util.BytesToUint32(bytes[i+12 : i+16])
- if e = fn(key, offset, size); e != nil {
- return e
- }
- }
- if e == io.EOF {
- return nil
- }
- count, e = r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
- readerOffset += int64(count)
- }
- return e
-}
-
-func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
- if key > nm.MaximumFileKey {
- nm.MaximumFileKey = key
- }
- oldSize := nm.m.Set(Key(key), offset, size)
+func appendToIndexFile(indexFile *os.File,
+ key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
- nm.FileCounter++
- nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- if oldSize > 0 {
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- if _, err := nm.indexFile.Seek(0, 2); err != nil {
- return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err)
+ if _, err := indexFile.Seek(0, 2); err != nil {
+ return fmt.Errorf("cannot seek end of indexfile %s: %v",
+ indexFile.Name(), err)
}
- return nm.indexFile.Write(bytes)
+ _, err := indexFile.Write(bytes)
+ return err
}
-func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- element, ok = nm.m.Get(Key(key))
- return
+
+func (mm *mapMetric) logDelete(deletedByteCount uint32) {
+ mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
+ mm.DeletionCounter++
}
-func (nm *NeedleMap) Delete(key uint64) error {
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key)))
- bytes := make([]byte, 16)
- util.Uint64toBytes(bytes[0:8], key)
- util.Uint32toBytes(bytes[8:12], 0)
- util.Uint32toBytes(bytes[12:16], 0)
- if _, err := nm.indexFile.Seek(0, 2); err != nil {
- return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err)
+
+func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
+ if key > mm.MaximumFileKey {
+ mm.MaximumFileKey = key
}
- if _, err := nm.indexFile.Write(bytes); err != nil {
- return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err)
+ mm.FileCounter++
+ mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
+ if oldSize > 0 {
+ mm.DeletionCounter++
+ mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
}
- nm.DeletionCounter++
- return nil
-}
-func (nm *NeedleMap) Close() {
- _ = nm.indexFile.Close()
-}
-func (nm *NeedleMap) Destroy() error {
- nm.Close()
- return os.Remove(nm.indexFile.Name())
-}
-func (nm NeedleMap) ContentSize() uint64 {
- return nm.FileByteCounter
-}
-func (nm NeedleMap) DeletedSize() uint64 {
- return nm.DeletionByteCounter
-}
-func (nm NeedleMap) FileCount() int {
- return nm.FileCounter
-}
-func (nm NeedleMap) DeletedCount() int {
- return nm.DeletionCounter
-}
-func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
- return nm.m.Visit(visit)
-}
-func (nm NeedleMap) MaxFileKey() uint64 {
- return nm.MaximumFileKey
}
diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go
new file mode 100644
index 000000000..73595278d
--- /dev/null
+++ b/go/storage/needle_map_leveldb.go
@@ -0,0 +1,151 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+type LevelDbNeedleMap struct {
+ dbFileName string
+ indexFile *os.File
+ db *leveldb.DB
+ mapMetric
+}
+
+func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) {
+ m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
+ if !isLevelDbFresh(dbFileName, indexFile) {
+ glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
+ generateDbFile(dbFileName, indexFile)
+ glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", dbFileName)
+ if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil {
+ return
+ }
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ nm, indexLoadError := LoadNeedleMap(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = nm.mapMetric
+ return
+}
+
+func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
+ if err != nil {
+ return false
+ }
+ defer dbLogFile.Close()
+ dbStat, dbStatErr := dbLogFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func generateDbFile(dbFileName string, indexFile *os.File) error {
+ db, err := leveldb.OpenFile(dbFileName, nil)
+ if err != nil {
+ return err
+ }
+ defer db.Close()
+ return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
+ if offset > 0 {
+ levelDbWrite(db, key, offset, size)
+ } else {
+ levelDbDelete(db, key)
+ }
+ return nil
+ })
+}
+
+func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ data, err := m.db.Get(bytes, nil)
+ if err != nil || len(data) != 8 {
+ glog.V(0).Infof("Failed to get %d %v", key, err)
+ return nil, false
+ }
+ offset := util.BytesToUint32(data[0:4])
+ size := util.BytesToUint32(data[4:8])
+ return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+}
+
+func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ var oldSize uint32
+ if oldNeedle, ok := m.Get(key); ok {
+ oldSize = oldNeedle.Size
+ }
+ m.logPut(key, oldSize, size)
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil {
+ return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
+ }
+ return levelDbWrite(m.db, key, offset, size)
+}
+
+func levelDbWrite(db *leveldb.DB,
+ key uint64, offset uint32, size uint32) error {
+ bytes := make([]byte, 16)
+ util.Uint64toBytes(bytes[0:8], key)
+ util.Uint32toBytes(bytes[8:12], offset)
+ util.Uint32toBytes(bytes[12:16], size)
+ if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil {
+ return fmt.Errorf("failed to write leveldb: %v", err)
+ }
+ return nil
+}
+func levelDbDelete(db *leveldb.DB, key uint64) error {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ return db.Delete(bytes, nil)
+}
+
+func (m *LevelDbNeedleMap) Delete(key uint64) error {
+ if oldNeedle, ok := m.Get(key); ok {
+ m.logDelete(oldNeedle.Size)
+ }
+ // write to index file first
+ if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil {
+ return err
+ }
+ return levelDbDelete(m.db, key)
+}
+
+func (m *LevelDbNeedleMap) Close() {
+ m.db.Close()
+}
+
+func (m *LevelDbNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.dbFileName)
+}
+
+func (m *LevelDbNeedleMap) ContentSize() uint64 {
+ return m.FileByteCounter
+}
+func (m *LevelDbNeedleMap) DeletedSize() uint64 {
+ return m.DeletionByteCounter
+}
+func (m *LevelDbNeedleMap) FileCount() int {
+ return m.FileCounter
+}
+func (m *LevelDbNeedleMap) DeletedCount() int {
+ return m.DeletionCounter
+}
+func (m *LevelDbNeedleMap) MaxFileKey() uint64 {
+ return m.MaximumFileKey
+}
diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go
new file mode 100644
index 000000000..5fce301bc
--- /dev/null
+++ b/go/storage/needle_map_memory.go
@@ -0,0 +1,126 @@
+package storage
+
+import (
+ "io"
+ "os"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+)
+
+type NeedleMap struct {
+ indexFile *os.File
+ m CompactMap
+
+ mapMetric
+}
+
+func NewNeedleMap(file *os.File) *NeedleMap {
+ nm := &NeedleMap{
+ m: NewCompactMap(),
+ indexFile: file,
+ }
+ return nm
+}
+
+const (
+ RowsToRead = 1024
+)
+
+func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
+ nm := NewNeedleMap(file)
+ e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
+ if key > nm.MaximumFileKey {
+ nm.MaximumFileKey = key
+ }
+ nm.FileCounter++
+ nm.FileByteCounter = nm.FileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
+ }
+ } else {
+ oldSize := nm.m.Delete(Key(key))
+ glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
+ }
+ return nil
+ })
+ glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
+ return nm, e
+}
+
+// walks through the index file, calls fn function with each key, offset, size
+// stops with the error returned by the fn function
+func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
+ var readerOffset int64
+ bytes := make([]byte, 16*RowsToRead)
+ count, e := r.ReadAt(bytes, readerOffset)
+ glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ readerOffset += int64(count)
+ var (
+ key uint64
+ offset, size uint32
+ i int
+ )
+
+ for count > 0 && e == nil || e == io.EOF {
+ for i = 0; i+16 <= count; i += 16 {
+ key = util.BytesToUint64(bytes[i : i+8])
+ offset = util.BytesToUint32(bytes[i+8 : i+12])
+ size = util.BytesToUint32(bytes[i+12 : i+16])
+ if e = fn(key, offset, size); e != nil {
+ return e
+ }
+ }
+ if e == io.EOF {
+ return nil
+ }
+ count, e = r.ReadAt(bytes, readerOffset)
+ glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ readerOffset += int64(count)
+ }
+ return e
+}
+
+func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ nm.logPut(key, oldSize, size)
+ return appendToIndexFile(nm.indexFile, key, offset, size)
+}
+func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ element, ok = nm.m.Get(Key(key))
+ return
+}
+func (nm *NeedleMap) Delete(key uint64) error {
+ deletedBytes := nm.m.Delete(Key(key))
+ nm.logDelete(deletedBytes)
+ return appendToIndexFile(nm.indexFile, key, 0, 0)
+}
+func (nm *NeedleMap) Close() {
+ _ = nm.indexFile.Close()
+}
+func (nm *NeedleMap) Destroy() error {
+ nm.Close()
+ return os.Remove(nm.indexFile.Name())
+}
+func (nm NeedleMap) ContentSize() uint64 {
+ return nm.FileByteCounter
+}
+func (nm NeedleMap) DeletedSize() uint64 {
+ return nm.DeletionByteCounter
+}
+func (nm NeedleMap) FileCount() int {
+ return nm.FileCounter
+}
+func (nm NeedleMap) DeletedCount() int {
+ return nm.DeletionCounter
+}
+
+func (nm NeedleMap) MaxFileKey() uint64 {
+ return nm.MaximumFileKey
+}
diff --git a/go/storage/store.go b/go/storage/store.go
index 2695537f6..65e5b218b 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -90,18 +90,18 @@ func (s *Store) String() (str string) {
return
}
-func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
+func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
location.volumes = make(map[VolumeId]*Volume)
- location.loadExistingVolumes()
+ location.loadExistingVolumes(useLevelDb)
s.Locations = append(s.Locations, location)
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, rt, ttl)
+ e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil {
e = err
}
}
@@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
+func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
+ if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -195,20 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *Rep
return fmt.Errorf("No more free space left")
}
-func (s *Store) FreezeVolume(volumeIdString string) error {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
- }
- if v := s.findVolume(vid); v != nil {
- if v.readOnly {
- return fmt.Errorf("Volume %s is already read-only", volumeIdString)
- }
- return v.freeze()
- }
- return fmt.Errorf("volume id %d is not found during freeze", vid)
-}
-func (l *DiskLocation) loadExistingVolumes() {
+func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
@@ -221,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
@@ -261,7 +248,7 @@ func (s *Store) SetRack(rack string) {
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster)
}
-func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
+func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.findMaster()
if e != nil {
return
@@ -317,13 +304,16 @@ func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
return "", "", err
}
- jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
+ joinUrl := "http://" + masterNode + "/dir/join"
+
+ jsonBlob, err := util.PostBytes(joinUrl, data)
if err != nil {
s.masterNodes.reset()
return "", "", err
}
var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
return masterNode, "", err
}
if ret.Error != "" {
@@ -354,7 +344,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
}
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
- if _, _, e := s.Join(); e != nil {
+ if _, _, e := s.SendHeartbeatToMaster(); e != nil {
glog.V(0).Infoln("error when reporting size:", e)
}
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 1988c9aac..2b47fb497 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -27,10 +27,10 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
- e = v.load(true, true)
+ e = v.load(true, true, useLevelDb)
return
}
func (v *Volume) String() string {
@@ -40,7 +40,7 @@ func (v *Volume) String() string {
func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
- e = v.load(false, false)
+ e = v.load(false, false, false)
return
}
func (v *Volume) FileName() (fileName string) {
@@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) {
}
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error {
var e error
fileName := v.FileName()
@@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
e = v.maybeWriteSuperBlock()
}
if e == nil && alsoLoadIndex {
- if v.readOnly {
- if v.ensureConvertIdxToCdb(fileName) {
- v.nm, e = OpenCdbMap(fileName + ".cdb")
- return e
- }
- }
var indexFile *os.File
if v.readOnly {
glog.V(1).Infoln("open to read file", fileName+".idx")
@@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
}
}
- glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
- if v.nm, e = LoadNeedleMap(indexFile); e != nil {
- glog.V(0).Infoln("loading error:", e)
+ if !useLevelDb {
+ glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
+ if v.nm, e = LoadNeedleMap(indexFile); e != nil {
+ glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
+ }
+ } else {
+ glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
+ if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ }
}
}
return e
@@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
}
nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
- if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
+ if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
}
@@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) {
return -1, errors.New("Not Found")
}
-func (v *Volume) freeze() error {
- if v.readOnly {
- return nil
- }
- nm, ok := v.nm.(*NeedleMap)
- if !ok {
- return nil
- }
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- bn, _ := baseFilename(v.dataFile.Name())
- cdbFn := bn + ".cdb"
- glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
- err := DumpNeedleMapToCdb(cdbFn, nm)
- if err != nil {
- return err
- }
- if v.nm, err = OpenCdbMap(cdbFn); err != nil {
- return err
- }
- nm.indexFile.Close()
- os.Remove(nm.indexFile.Name())
- v.readOnly = true
- return nil
-}
-
func ScanVolumeFile(dirname string, collection string, id VolumeId,
visitSuperBlock func(SuperBlock) error,
readNeedleBody bool,
@@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti
modTime = fi.ModTime()
return
}
-func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
- var indexFile *os.File
- var e error
- _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
- _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
- if cdbCanRead && cdbModTime.After(idxModeTime) {
- return true
- }
- if !cdbCanWrite {
- return false
- }
- if !idxCanRead {
- glog.V(0).Infoln("Can not read file", fileName+".idx!")
- return false
- }
- glog.V(2).Infoln("opening file", fileName+".idx")
- if indexFile, e = os.Open(fileName + ".idx"); e != nil {
- glog.V(0).Infoln("Failed to read file", fileName+".idx !")
- return false
- }
- defer indexFile.Close()
- glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
- if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
- glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e)
- return false
- }
- return true
-}
// volume is expired if modified time + volume ttl < now
// except when volume is empty
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 7e026a61d..9f6f8e35f 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error {
}
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
- if e = v.load(true, false); e != nil {
+ if e = v.load(true, false, false); e != nil {
return e
}
return nil
@@ -73,7 +73,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
- if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
+ if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, err = n.Append(dst, v.Version()); err != nil {
diff --git a/go/util/http_util.go b/go/util/http_util.go
index 72cab76e1..52579d746 100644
--- a/go/util/http_util.go
+++ b/go/util/http_util.go
@@ -26,12 +26,12 @@ func init() {
func PostBytes(url string, body []byte) ([]byte, error) {
r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("Post to %s: %v", url, err)
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("Read response body: %v", err)
}
return b, nil
}
diff --git a/go/weed/compact.go b/go/weed/compact.go
index 71c4ea90f..6ce55a609 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil)
+ v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, false, nil, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/go/weed/fix.go b/go/weed/fix.go
index b2df07554..d2cd40398 100644
--- a/go/weed/fix.go
+++ b/go/weed/fix.go
@@ -52,8 +52,8 @@ func runFix(cmd *Command, args []string) bool {
}, false, func(n *storage.Needle, offset int64) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped())
if n.Size > 0 {
- count, pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size)
- glog.V(2).Infof("saved %d with error %v", count, pe)
+ pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size)
+ glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
glog.V(2).Infof("skipping deleted file ...")
return nm.Delete(n.Id)
diff --git a/go/weed/server.go b/go/weed/server.go
index 48612a27b..71346de0a 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -68,6 +68,7 @@ var (
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ volumeUseLevelDb = cmdServer.Flag.Bool("volume.leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.")
volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
@@ -235,6 +236,7 @@ func runServer(cmd *Command, args []string) bool {
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*serverIp, *volumePort, *serverPublicUrl,
folders, maxCounts,
+ *volumeUseLevelDb,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation,
)
diff --git a/go/weed/volume.go b/go/weed/volume.go
index aa2643d20..e2c6ebd94 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -32,6 +32,7 @@ type VolumeServerOptions struct {
dataCenter *string
rack *string
whiteList []string
+ useLevelDb *bool
fixJpgOrientation *bool
}
@@ -48,6 +49,7 @@ func init() {
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
+ v.useLevelDb = cmdVolume.Flag.Bool("leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
}
@@ -116,6 +118,7 @@ func runVolume(cmd *Command, args []string) bool {
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits,
+ *v.useLevelDb,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation,
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index e3878fac4..d84b39808 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -20,12 +20,14 @@ type VolumeServer struct {
store *storage.Store
guard *security.Guard
+ UseLevelDb bool
FixJpgOrientation bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
folders []string, maxCounts []int,
+ useLevelDb bool,
masterNode string, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
@@ -34,10 +36,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
+ UseLevelDb: useLevelDb,
FixJpgOrientation: fixJpgOrientation,
}
vs.SetMasterNode(masterNode)
- vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
+ vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.UseLevelDb)
vs.guard = security.NewGuard(whiteList, "")
@@ -47,7 +50,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler))
adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
- adminMux.HandleFunc("/admin/freeze_volume", vs.guard.WhiteList(vs.freezeVolumeHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
@@ -66,7 +68,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
for {
- master, secretKey, err := vs.store.Join()
+ master, secretKey, err := vs.store.SendHeartbeatToMaster()
if err == nil {
if !connected {
connected = true
@@ -75,7 +77,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
glog.V(0).Infoln("Volume Server Connected with master at", master)
}
} else {
- glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error())
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs, err)
if connected {
connected = false
}
diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go
index c84b72db0..0d70a757e 100644
--- a/go/weed/weed_server/volume_server_handlers_admin.go
+++ b/go/weed/weed_server/volume_server_handlers_admin.go
@@ -17,7 +17,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
- err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl"))
+ err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.UseLevelDb, r.FormValue("replication"), r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
@@ -40,17 +40,6 @@ func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.R
glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err)
}
-func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Request) {
- //TODO: notify master that this volume will be read-only
- err := vs.store.FreezeVolume(r.FormValue("volume"))
- if err == nil {
- writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
- } else {
- writeJsonError(w, r, http.StatusInternalServerError, err)
- }
- glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err)
-}
-
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION