diff options
| author | chrislusf <chris.lu@gmail.com> | 2015-03-27 16:34:58 -0700 |
|---|---|---|
| committer | chrislusf <chris.lu@gmail.com> | 2015-03-27 16:34:58 -0700 |
| commit | 020ba6c9a85efd9f2bf5f3c7ce96b38857f2128e (patch) | |
| tree | 2304a6f6460dd3cc80249cc6877c422e1c951d5d /go/storage | |
| parent | add99ed57efb52416ee8931fa5a67e706ce089a2 (diff) | |
| download | seaweedfs-020ba6c9a85efd9f2bf5f3c7ce96b38857f2128e.tar.xz seaweedfs-020ba6c9a85efd9f2bf5f3c7ce96b38857f2128e.zip | |
add leveldb support for needle map
This supposedly should reduce memory consumption. However, for tests
with millions of, this shows consuming more memories. Need to see
whether this will work out. If not, later boltdb will be tested.
Diffstat (limited to 'go/storage')
| -rw-r--r-- | go/storage/needle_map.go | 158 | ||||
| -rw-r--r-- | go/storage/needle_map_leveldb.go | 151 | ||||
| -rw-r--r-- | go/storage/needle_map_memory.go | 126 | ||||
| -rw-r--r-- | go/storage/store.go | 40 | ||||
| -rw-r--r-- | go/storage/volume.go | 83 | ||||
| -rw-r--r-- | go/storage/volume_vacuum.go | 4 |
6 files changed, 330 insertions, 232 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index f74191742..0bfa12180 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -2,15 +2,13 @@ package storage import ( "fmt" - "io" "os" - "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/util" ) type NeedleMapper interface { - Put(key uint64, offset uint32, size uint32) (int, error) + Put(key uint64, offset uint32, size uint32) error Get(key uint64) (element *NeedleValue, ok bool) Delete(key uint64) error Close() @@ -19,7 +17,6 @@ type NeedleMapper interface { DeletedSize() uint64 FileCount() int DeletedCount() int - Visit(visit func(NeedleValue) error) (err error) MaxFileKey() uint64 } @@ -31,146 +28,33 @@ type mapMetric struct { MaximumFileKey uint64 `json:"MaxFileKey"` } -type NeedleMap struct { - indexFile *os.File - m CompactMap - - mapMetric -} - -func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: NewCompactMap(), - indexFile: file, - } - return nm -} - -const ( - RowsToRead = 1024 -) - -func LoadNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewNeedleMap(file) - e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - return nil - }) - glog.V(1).Infoln("max file key:", nm.MaximumFileKey) - return nm, e -} - -// walks through the index file, calls fn function with each key, offset, size -// stops with the error returned by the fn function -func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { - var readerOffset int64 - bytes := make([]byte, 16*RowsToRead) - count, e := r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - var ( - key uint64 - offset, size uint32 - i int - ) - - for count > 0 && e == nil || e == io.EOF { - for i = 0; i+16 <= count; i += 16 { - key = util.BytesToUint64(bytes[i : i+8]) - offset = util.BytesToUint32(bytes[i+8 : i+12]) - size = util.BytesToUint32(bytes[i+12 : i+16]) - if e = fn(key, offset, size); e != nil { - return e - } - } - if e == io.EOF { - return nil - } - count, e = r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - } - return e -} - -func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - oldSize := nm.m.Set(Key(key), offset, size) +func appendToIndexFile(indexFile *os.File, + key uint64, offset uint32, size uint32) error { bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) util.Uint32toBytes(bytes[8:12], offset) util.Uint32toBytes(bytes[12:16], size) - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) + if _, err := indexFile.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + indexFile.Name(), err) } - return nm.indexFile.Write(bytes) + _, err := indexFile.Write(bytes) + return err } -func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) - return + +func (mm *mapMetric) logDelete(deletedByteCount uint32) { + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) + mm.DeletionCounter++ } -func (nm *NeedleMap) Delete(key uint64) error { - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key))) - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], 0) - util.Uint32toBytes(bytes[12:16], 0) - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) + +func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { + if key > mm.MaximumFileKey { + mm.MaximumFileKey = key } - if _, err := nm.indexFile.Write(bytes); err != nil { - return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err) + mm.FileCounter++ + mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) + if oldSize > 0 { + mm.DeletionCounter++ + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) } - nm.DeletionCounter++ - return nil -} -func (nm *NeedleMap) Close() { - _ = nm.indexFile.Close() -} -func (nm *NeedleMap) Destroy() error { - nm.Close() - return os.Remove(nm.indexFile.Name()) -} -func (nm NeedleMap) ContentSize() uint64 { - return nm.FileByteCounter -} -func (nm NeedleMap) DeletedSize() uint64 { - return nm.DeletionByteCounter -} -func (nm NeedleMap) FileCount() int { - return nm.FileCounter -} -func (nm NeedleMap) DeletedCount() int { - return nm.DeletionCounter -} -func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { - return nm.m.Visit(visit) -} -func (nm NeedleMap) MaxFileKey() uint64 { - return nm.MaximumFileKey } diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go new file mode 100644 index 000000000..73595278d --- /dev/null +++ b/go/storage/needle_map_leveldb.go @@ -0,0 +1,151 @@ +package storage + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/syndtr/goleveldb/leveldb" +) + +type LevelDbNeedleMap struct { + dbFileName string + indexFile *os.File + db *leveldb.DB + mapMetric +} + +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { + m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + if !isLevelDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG")) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateDbFile(dbFileName string, indexFile *os.File) error { + db, err := leveldb.OpenFile(dbFileName, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + levelDbWrite(db, key, offset, size) + } else { + levelDbDelete(db, key) + } + return nil + }) +} + +func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + data, err := m.db.Get(bytes, nil) + if err != nil || len(data) != 8 { + glog.V(0).Infof("Failed to get %d %v", key, err) + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return levelDbWrite(m.db, key, offset, size) +} + +func levelDbWrite(db *leveldb.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil { + return fmt.Errorf("failed to write leveldb: %v", err) + } + return nil +} +func levelDbDelete(db *leveldb.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Delete(bytes, nil) +} + +func (m *LevelDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + return err + } + return levelDbDelete(m.db, key) +} + +func (m *LevelDbNeedleMap) Close() { + m.db.Close() +} + +func (m *LevelDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} + +func (m *LevelDbNeedleMap) ContentSize() uint64 { + return m.FileByteCounter +} +func (m *LevelDbNeedleMap) DeletedSize() uint64 { + return m.DeletionByteCounter +} +func (m *LevelDbNeedleMap) FileCount() int { + return m.FileCounter +} +func (m *LevelDbNeedleMap) DeletedCount() int { + return m.DeletionCounter +} +func (m *LevelDbNeedleMap) MaxFileKey() uint64 { + return m.MaximumFileKey +} diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go new file mode 100644 index 000000000..5fce301bc --- /dev/null +++ b/go/storage/needle_map_memory.go @@ -0,0 +1,126 @@ +package storage + +import ( + "io" + "os" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" +) + +type NeedleMap struct { + indexFile *os.File + m CompactMap + + mapMetric +} + +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: NewCompactMap(), + indexFile: file, + } + return nm +} + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { + nm := NewNeedleMap(file) + e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { + if key > nm.MaximumFileKey { + nm.MaximumFileKey = key + } + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(Key(key)) + glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + return nil + }) + glog.V(1).Infoln("max file key:", nm.MaximumFileKey) + return nm, e +} + +// walks through the index file, calls fn function with each key, offset, size +// stops with the error returned by the fn function +func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { + var readerOffset int64 + bytes := make([]byte, 16*RowsToRead) + count, e := r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + var ( + key uint64 + offset, size uint32 + i int + ) + + for count > 0 && e == nil || e == io.EOF { + for i = 0; i+16 <= count; i += 16 { + key = util.BytesToUint64(bytes[i : i+8]) + offset = util.BytesToUint32(bytes[i+8 : i+12]) + size = util.BytesToUint32(bytes[i+12 : i+16]) + if e = fn(key, offset, size); e != nil { + return e + } + } + if e == io.EOF { + return nil + } + count, e = r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + } + return e +} + +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { + oldSize := nm.m.Set(Key(key), offset, size) + nm.logPut(key, oldSize, size) + return appendToIndexFile(nm.indexFile, key, offset, size) +} +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + element, ok = nm.m.Get(Key(key)) + return +} +func (nm *NeedleMap) Delete(key uint64) error { + deletedBytes := nm.m.Delete(Key(key)) + nm.logDelete(deletedBytes) + return appendToIndexFile(nm.indexFile, key, 0, 0) +} +func (nm *NeedleMap) Close() { + _ = nm.indexFile.Close() +} +func (nm *NeedleMap) Destroy() error { + nm.Close() + return os.Remove(nm.indexFile.Name()) +} +func (nm NeedleMap) ContentSize() uint64 { + return nm.FileByteCounter +} +func (nm NeedleMap) DeletedSize() uint64 { + return nm.DeletionByteCounter +} +func (nm NeedleMap) FileCount() int { + return nm.FileCounter +} +func (nm NeedleMap) DeletedCount() int { + return nm.DeletionCounter +} + +func (nm NeedleMap) MaxFileKey() uint64 { + return nm.MaximumFileKey +} diff --git a/go/storage/store.go b/go/storage/store.go index 2695537f6..65e5b218b 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -90,18 +90,18 @@ func (s *Store) String() (str string) { return } -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location.volumes = make(map[VolumeId]*Volume) - location.loadExistingVolumes() + location.loadExistingVolumes(useLevelDb) s.Locations = append(s.Locations, location) } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt, ttl) + e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil { e = err } } @@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -195,20 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *Rep return fmt.Errorf("No more free space left") } -func (s *Store) FreezeVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - } - if v := s.findVolume(vid); v != nil { - if v.readOnly { - return fmt.Errorf("Volume %s is already read-only", volumeIdString) - } - return v.freeze() - } - return fmt.Errorf("volume id %d is not found during freeze", vid) -} -func (l *DiskLocation) loadExistingVolumes() { +func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { name := dir.Name() @@ -221,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil { l.volumes[vid] = v glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } @@ -261,7 +248,7 @@ func (s *Store) SetRack(rack string) { func (s *Store) SetBootstrapMaster(bootstrapMaster string) { s.masterNodes = NewMasterNodes(bootstrapMaster) } -func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { +func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { masterNode, e = s.masterNodes.findMaster() if e != nil { return @@ -317,13 +304,16 @@ func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { return "", "", err } - jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data) + joinUrl := "http://" + masterNode + "/dir/join" + + jsonBlob, err := util.PostBytes(joinUrl, data) if err != nil { s.masterNodes.reset() return "", "", err } var ret operation.JoinResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { + glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) return masterNode, "", err } if ret.Error != "" { @@ -354,7 +344,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) - if _, _, e := s.Join(); e != nil { + if _, _, e := s.SendHeartbeatToMaster(); e != nil { glog.V(0).Infoln("error when reporting size:", e) } } diff --git a/go/storage/volume.go b/go/storage/volume.go index 1988c9aac..2b47fb497 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -27,10 +27,10 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - e = v.load(true, true) + e = v.load(true, true, useLevelDb) return } func (v *Volume) String() string { @@ -40,7 +40,7 @@ func (v *Volume) String() string { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} - e = v.load(false, false) + e = v.load(false, false, false) return } func (v *Volume) FileName() (fileName string) { @@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) { } return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { var e error fileName := v.FileName() @@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - if v.readOnly { - if v.ensureConvertIdxToCdb(fileName) { - v.nm, e = OpenCdbMap(fileName + ".cdb") - return e - } - } var indexFile *os.File if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") @@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly) - if v.nm, e = LoadNeedleMap(indexFile); e != nil { - glog.V(0).Infoln("loading error:", e) + if !useLevelDb { + glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) + if v.nm, e = LoadNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) + } + } else { + glog.V(0).Infoln("loading leveldb file", fileName+".ldb") + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) + } } } return e @@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { + if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) { return -1, errors.New("Not Found") } -func (v *Volume) freeze() error { - if v.readOnly { - return nil - } - nm, ok := v.nm.(*NeedleMap) - if !ok { - return nil - } - v.accessLock.Lock() - defer v.accessLock.Unlock() - bn, _ := baseFilename(v.dataFile.Name()) - cdbFn := bn + ".cdb" - glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn) - err := DumpNeedleMapToCdb(cdbFn, nm) - if err != nil { - return err - } - if v.nm, err = OpenCdbMap(cdbFn); err != nil { - return err - } - nm.indexFile.Close() - os.Remove(nm.indexFile.Name()) - v.readOnly = true - return nil -} - func ScanVolumeFile(dirname string, collection string, id VolumeId, visitSuperBlock func(SuperBlock) error, readNeedleBody bool, @@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti modTime = fi.ModTime() return } -func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { - var indexFile *os.File - var e error - _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb") - _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx") - if cdbCanRead && cdbModTime.After(idxModeTime) { - return true - } - if !cdbCanWrite { - return false - } - if !idxCanRead { - glog.V(0).Infoln("Can not read file", fileName+".idx!") - return false - } - glog.V(2).Infoln("opening file", fileName+".idx") - if indexFile, e = os.Open(fileName + ".idx"); e != nil { - glog.V(0).Infoln("Failed to read file", fileName+".idx !") - return false - } - defer indexFile.Close() - glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) - if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e) - return false - } - return true -} // volume is expired if modified time + volume ttl < now // except when volume is empty diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7e026a61d..9f6f8e35f 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error { } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - if e = v.load(true, false); e != nil { + if e = v.load(true, false, false); e != nil { return e } return nil @@ -73,7 +73,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, err = n.Append(dst, v.Version()); err != nil { |
