aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed-fs/src/pkg/storage/cdb_map.go107
-rw-r--r--weed-fs/src/pkg/storage/compact_map.go4
-rw-r--r--weed-fs/src/pkg/storage/needle_map.go112
-rw-r--r--weed-fs/src/pkg/storage/volume.go4
4 files changed, 32 insertions, 195 deletions
diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go
deleted file mode 100644
index b80b3cde9..000000000
--- a/weed-fs/src/pkg/storage/cdb_map.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package storage
-
-import (
- "github.com/tgulacsi/go-cdb"
- "io"
- "log"
- "os"
- "pkg/util"
- "strings"
-)
-
-type CdbMap struct {
- db *cdb.Cdb
- transient []byte
- Filename string
-}
-
-// Opens the CDB file and servers as a needle map
-func NewCdbMap(filename string) (*CdbMap, error) {
- m, err := cdb.Open(filename)
- if err != nil {
- return nil, err
- }
- return &CdbMap{db: m, transient: make([]byte, 8),
- Filename: filename}, nil
-}
-
-// writes the content of the index file to a CDB and returns that
-func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
- nm := indexFile.Name()
- nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb"
-
- var (
- key uint64
- offset uint32
- ok bool
- )
- deleted := make(map[uint64]bool, 16)
- gatherDeletes := func(buf []byte) error {
- key = util.BytesToUint64(buf[:8])
- offset = util.BytesToUint32(buf[8:12])
- if offset > 0 {
- if _, ok = deleted[key]; ok { //undelete
- delete(deleted, key)
- }
- } else {
- deleted[key] = true
- }
- return nil
- }
- if err := readIndexFile(indexFile, gatherDeletes); err != nil {
- return nil, err
- }
-
- w, err := cdb.NewWriter(nm)
- if err != nil {
- return nil, err
- }
- iterFun := func(buf []byte) error {
- key = util.BytesToUint64(buf[:8])
- if _, ok = deleted[key]; !ok {
- w.PutPair(buf[:8], buf[8:16])
- }
- return nil
- }
- indexFile.Seek(0, 0)
- err = readIndexFile(indexFile, iterFun)
- w.Close()
- if err != nil {
- return nil, err
- }
-
- return NewCdbMap(nm)
-}
-
-func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) {
- util.Uint64toBytes(m.transient, uint64(key))
- data, err := m.db.Data(m.transient)
- if err != nil {
- if err == io.EOF {
- return nil, false
- }
- log.Printf("error getting %s: %s", key, err)
- return nil, false
- }
- return &NeedleValue{Key: key,
- Offset: util.BytesToUint32(data[:4]),
- Size: util.BytesToUint32(data[4:8]),
- }, true
-}
-
-func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
- r, err := os.Open(m.Filename)
- if err != nil {
- return err
- }
- defer r.Close()
-
- iterFunc := func(elt cdb.Element) error {
- return pedestrian(&NeedleValue{
- Key: Key(util.BytesToUint64(elt.Key[:8])),
- Offset: util.BytesToUint32(elt.Data[:4]),
- Size: util.BytesToUint32(elt.Data[4:8]),
- })
- }
- return cdb.DumpMap(r, iterFunc)
-}
diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go
index 61cc2c841..90ed42198 100644
--- a/weed-fs/src/pkg/storage/compact_map.go
+++ b/weed-fs/src/pkg/storage/compact_map.go
@@ -109,8 +109,8 @@ type CompactMap struct {
list []CompactSection
}
-func NewCompactMap() *CompactMap {
- return &CompactMap{}
+func NewCompactMap() CompactMap {
+ return CompactMap{}
}
func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go
index 9d7369509..9ce38c1f2 100644
--- a/weed-fs/src/pkg/storage/needle_map.go
+++ b/weed-fs/src/pkg/storage/needle_map.go
@@ -1,7 +1,6 @@
package storage
import (
- "io"
"log"
"os"
"pkg/util"
@@ -9,8 +8,7 @@ import (
type NeedleMap struct {
indexFile *os.File
- m MapGetSetter // modifiable map
- fm MapGetter // frozen map
+ m CompactMap
//transient
bytes []byte
@@ -21,106 +19,52 @@ type NeedleMap struct {
fileByteCounter uint64
}
-// Map interface for frozen maps
-type MapGetter interface {
- Get(key Key) (element *NeedleValue, ok bool)
- Walk(pedestrian func(*NeedleValue) error) error
-}
-
-// Modifiable map interface
-type MapSetter interface {
- Set(key Key, offset, size uint32) (oldsize uint32)
- Delete(key Key) uint32
-}
-
-// Settable and gettable map
-type MapGetSetter interface {
- MapGetter
- MapSetter
-}
-
-// New in-memory needle map, backed by "file" index file
func NewNeedleMap(file *os.File) *NeedleMap {
- return &NeedleMap{
+ nm := &NeedleMap{
m: NewCompactMap(),
bytes: make([]byte, 16),
indexFile: file,
}
-}
-
-// Nes frozen (on-disk, not modifiable(!)) needle map
-func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) {
- fm, err := NewCdbMapFromIndex(file)
- if err != nil {
- return nil, err
- }
- return &NeedleMap{
- fm: fm,
- bytes: make([]byte, 16),
- }, nil
+ return nm
}
const (
RowsToRead = 1024
)
-func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
+func LoadNeedleMap(file *os.File) *NeedleMap {
nm := NewNeedleMap(file)
-
- var (
- key uint64
- offset, size, oldSize uint32
- )
- iterFun := func(buf []byte) error {
- key = util.BytesToUint64(buf[:8])
- offset = util.BytesToUint32(buf[8:12])
- size = util.BytesToUint32(buf[12:16])
- nm.fileCounter++
- nm.fileByteCounter = nm.fileByteCounter + uint64(size)
- if offset > 0 {
- oldSize = nm.m.Set(Key(key), offset, size)
- //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
- }
- } else {
- nm.m.Delete(Key(key))
- //log.Println("removing key", key)
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
- }
-
- return nil
- }
- if err := readIndexFile(file, iterFun); err != nil {
- return nil, err
- }
- return nm, nil
-}
-
-// calls iterFun with each row (raw 16 bytes)
-func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
- buf := make([]byte, 16*RowsToRead)
- count, e := io.ReadAtLeast(indexFile, buf, 16)
- if e != nil && count > 0 {
- fstat, err := indexFile.Stat()
- if err != nil {
- log.Println("ERROR stating %s: %s", indexFile, err)
- } else {
- log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
- }
+ bytes := make([]byte, 16*RowsToRead)
+ count, e := nm.indexFile.Read(bytes)
+ if count > 0 {
+ fstat, _ := file.Stat()
+ log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
}
for count > 0 && e == nil {
for i := 0; i < count; i += 16 {
- if e = iterFun(buf[i : i+16]); e != nil {
- return e
+ key := util.BytesToUint64(bytes[i : i+8])
+ offset := util.BytesToUint32(bytes[i+8 : i+12])
+ size := util.BytesToUint32(bytes[i+12 : i+16])
+ nm.fileCounter++
+ nm.fileByteCounter = nm.fileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
+ }
+ } else {
+ nm.m.Delete(Key(key))
+ //log.Println("removing key", key)
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
}
}
- count, e = io.ReadAtLeast(indexFile, buf, 16)
+ count, e = nm.indexFile.Read(bytes)
}
- return nil
+ return nm
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index 71dfb5aee..5e64d0763 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -48,8 +48,8 @@ func (v *Volume) load() error {
if ie != nil {
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
}
- v.nm, e = LoadNeedleMap(indexFile)
- return e
+ v.nm = LoadNeedleMap(indexFile)
+ return nil
}
func (v *Volume) Version() Version {
return v.version