aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-05-05 11:19:41 -0700
committerChris Lu <chris.lu@gmail.com>2013-05-05 11:19:41 -0700
commit715d327df0ad64a70837711c664e1ef024e0bcc5 (patch)
treed86297c936848cb4ec17b6ad11854b28021e1d90
parentfb635146a139f350b126cdb6d93821cad13cdde5 (diff)
downloadseaweedfs-715d327df0ad64a70837711c664e1ef024e0bcc5.tar.xz
seaweedfs-715d327df0ad64a70837711c664e1ef024e0bcc5.zip
Tamás Gulácsi contributed cdb map for read only extremely low memory
implementation.
-rw-r--r--go/storage/cdb_map.go230
-rw-r--r--go/storage/cdb_map_test.go168
2 files changed, 398 insertions, 0 deletions
diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go
new file mode 100644
index 000000000..ebb49d514
--- /dev/null
+++ b/go/storage/cdb_map.go
@@ -0,0 +1,230 @@
+package storage
+
+import (
+ "code.google.com/p/weed-fs/go/util"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/tgulacsi/go-cdb"
+ "os"
+ "path/filepath"
+)
+
+// CDB-backed read-only needle map
+type cdbMap struct {
+ c1, c2 *cdb.Cdb
+ fn1, fn2 string
+ mapMetric
+}
+
+const maxCdbRecCount = 100000000
+
+var errReadOnly = errors.New("cannot modify a read-only map")
+
+// opens the cdb file(s) (base.cdb OR base.1.cdb AND base.2.cdb)
+// in case of two files, the metric (at key 'M') must be in base.2.cdb
+func OpenCdbMap(fileName string) (m *cdbMap, err error) {
+ m = new(cdbMap)
+ if m.c1, err = cdb.Open(fileName); err == nil {
+ m.fn1 = fileName
+ err = getMetric(m.c1, &m.mapMetric)
+ return
+ }
+ if os.IsNotExist(err) {
+ bn, ext := nakeFilename(fileName)
+ m.fn1 = bn + ".1" + ext
+ if m.c1, err = cdb.Open(m.fn1); err != nil {
+ return nil, err
+ }
+ m.fn2 = bn + ".2" + ext
+ if m.c2, err = cdb.Open(m.fn2); err != nil {
+ return nil, err
+ }
+ err = getMetric(m.c2, &m.mapMetric)
+ return
+ }
+ return nil, err
+}
+
+func (m *cdbMap) Put(key uint64, offset uint32, size uint32) (int, error) {
+ return -1, errReadOnly
+}
+func (m *cdbMap) Delete(key uint64) error {
+ return errReadOnly
+}
+
+func (m *cdbMap) Close() {
+ if m.c2 != nil {
+ m.c2.Close()
+ m.c2 = nil
+ }
+ if m.c1 != nil {
+ m.c1.Close()
+ m.c1 = nil
+ }
+}
+
+func (m cdbMap) ContentSize() uint64 {
+ return m.FileByteCounter
+}
+func (m cdbMap) DeletedSize() uint64 {
+ return m.DeletionByteCounter
+}
+func (m cdbMap) FileCount() int {
+ return m.FileCounter
+}
+func (m *cdbMap) DeletedCount() int {
+ return m.DeletionCounter
+}
+
+func getMetric(c *cdb.Cdb, m *mapMetric) error {
+ data, err := c.Data([]byte{'M'})
+ if err != nil {
+ return err
+ }
+ return json.Unmarshal(data, m)
+}
+
+func (m cdbMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ var (
+ data []byte
+ k []byte = make([]byte, 8)
+ err error
+ )
+ util.Uint64toBytes(k, key)
+ if data, err = m.c1.Data(k); err != nil || data == nil {
+ if m.c2 == nil {
+ return nil, false
+ }
+ if data, err = m.c2.Data(k); err != nil || data == nil {
+ return nil, false
+ }
+ }
+ return &NeedleValue{Key: Key(key), Offset: util.BytesToUint32(data[:4]),
+ Size: util.BytesToUint32(data[4:])}, true
+}
+
+func (m cdbMap) Visit(visit func(NeedleValue) error) (err error) {
+ fh, err := os.Open(m.fn1)
+ if err != nil {
+ return fmt.Errorf("cannot open %s: %s", m.fn1, err)
+ }
+ defer fh.Close()
+ walk := func(elt cdb.Element) error {
+ if len(elt.Key) != 8 {
+ return nil
+ }
+ return visit(NeedleValue{Key: Key(util.BytesToUint64(elt.Key)),
+ Offset: util.BytesToUint32(elt.Data[:4]),
+ Size: util.BytesToUint32(elt.Data[4:8])})
+ }
+ if err = cdb.DumpMap(fh, walk); err != nil {
+ return err
+ }
+ if m.c2 == nil {
+ return nil
+ }
+ fh.Close()
+ if fh, err = os.Open(m.fn2); err != nil {
+ return fmt.Errorf("cannot open %s: %s", m.fn2, err)
+ }
+ return cdb.DumpMap(fh, walk)
+}
+
+// converts an .idx index to a cdb
+func ConvertIndexToCdb(cdbName string, index *os.File) error {
+ idx, err := LoadNeedleMap(index)
+ if err != nil {
+ return fmt.Errorf("error loading needle map %s: %s", index, err)
+ }
+ defer idx.Close()
+ return DumpNeedleMapToCdb(cdbName, idx)
+}
+
+// dumps a NeedleMap into a cdb
+func DumpNeedleMapToCdb(cdbName string, nm *NeedleMap) error {
+ tempnam := cdbName + "t"
+ fnames := make([]string, 1, 2)
+ adder, closer, err := openTempCdb(tempnam)
+ if err != nil {
+ return fmt.Errorf("error creating factory: %s", err)
+ }
+ fnames[0] = tempnam
+
+ elt := cdb.Element{Key: make([]byte, 8), Data: make([]byte, 8)}
+
+ fcount := uint64(0)
+ walk := func(key uint64, offset, size uint32) error {
+ if fcount >= maxCdbRecCount {
+ if err = closer(); err != nil {
+ return err
+ }
+ tempnam = cdbName + "t2"
+ if adder, closer, err = openTempCdb(tempnam); err != nil {
+ return fmt.Errorf("error creating second factory: %s", err)
+ }
+ fnames = append(fnames, tempnam)
+ fcount = 0
+ }
+ util.Uint64toBytes(elt.Key, key)
+ util.Uint32toBytes(elt.Data[:4], offset)
+ util.Uint32toBytes(elt.Data[4:], size)
+ fcount++
+ return adder(elt)
+ }
+ // and write out the cdb from there
+ err = nm.Visit(func(nv NeedleValue) error {
+ return walk(uint64(nv.Key), nv.Offset, nv.Size)
+ })
+ if err != nil {
+ closer()
+ return fmt.Errorf("error walking index %s: %s", nm, err)
+ }
+ // store fileBytes
+ data, e := json.Marshal(nm.mapMetric)
+ if e != nil {
+ return fmt.Errorf("error marshaling metric %s: %s", nm.mapMetric, e)
+ }
+ if err = adder(cdb.Element{Key: []byte{'M'}, Data: data}); err != nil {
+ return err
+ }
+ if err = closer(); err != nil {
+ return err
+ }
+
+ os.Remove(cdbName)
+ if len(fnames) == 1 {
+ return os.Rename(fnames[0], cdbName)
+ }
+ bn, ext := nakeFilename(cdbName)
+ if err = os.Rename(fnames[0], bn+".1"+ext); err != nil {
+ return err
+ }
+ return os.Rename(fnames[1], bn+".2"+ext)
+}
+
+func openTempCdb(fileName string) (cdb.AdderFunc, cdb.CloserFunc, error) {
+ fh, err := os.Create(fileName)
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot create cdb file %s: %s", fileName, err)
+ }
+ adder, closer, err := cdb.MakeFactory(fh)
+ if err != nil {
+ fh.Close()
+ return nil, nil, fmt.Errorf("error creating factory: %s", err)
+ }
+ return adder, func() error {
+ if e := closer(); e != nil {
+ fh.Close()
+ return e
+ }
+ fh.Close()
+ return nil
+ }, nil
+}
+
+// returns filename without extension, and the extension
+func nakeFilename(fileName string) (string, string) {
+ ext := filepath.Ext(fileName)
+ return fileName[:len(fileName)-len(ext)], ext
+}
diff --git a/go/storage/cdb_map_test.go b/go/storage/cdb_map_test.go
new file mode 100644
index 000000000..e932a7ec5
--- /dev/null
+++ b/go/storage/cdb_map_test.go
@@ -0,0 +1,168 @@
+package storage
+
+import (
+ "log"
+ "math/rand"
+ "os"
+ "runtime"
+ "testing"
+)
+
+var testIndexFilename string = "../../test/sample.idx"
+
+func TestCdbMap0Convert(t *testing.T) {
+ indexFile, err := os.Open(testIndexFilename)
+ if err != nil {
+ t.Fatalf("cannot open %s: %s", testIndexFilename, err)
+ }
+ defer indexFile.Close()
+
+ cdbFn := testIndexFilename + ".cdb"
+ t.Logf("converting %s to %s", cdbFn, cdbFn)
+ if err = ConvertIndexToCdb(cdbFn, indexFile); err != nil {
+ t.Fatalf("error while converting: %s", err)
+ }
+}
+
+func TestCdbMap1Mem(t *testing.T) {
+ var nm NeedleMapper
+ i := 0
+ visit := func(nv NeedleValue) error {
+ i++
+ return nil
+ }
+
+ a := getMemStats()
+ t.Logf("opening %s.cdb", testIndexFilename)
+ nm, err := OpenCdbMap(testIndexFilename + ".cdb")
+ if err != nil {
+ t.Fatalf("error opening cdb: %s", err)
+ }
+ b := getMemStats()
+ log.Printf("opening cdb consumed %d bytes", b-a)
+ defer nm.Close()
+
+ a = getMemStats()
+ if err = nm.Visit(visit); err != nil {
+ t.Fatalf("error visiting %s: %s", nm, err)
+ }
+ b = getMemStats()
+ log.Printf("visit cdb %d consumed %d bytes", i, b-a)
+ nm.Close()
+
+ indexFile, err := os.Open(testIndexFilename)
+ if err != nil {
+ t.Fatalf("error opening idx: %s", err)
+ }
+ a = getMemStats()
+ nm, err = LoadNeedleMap(indexFile)
+ if err != nil {
+ t.Fatalf("error loading idx: %s", err)
+ }
+ defer nm.Close()
+ b = getMemStats()
+ log.Printf("opening idx consumed %d bytes", b-a)
+
+ i = 0
+ a = getMemStats()
+ if err = nm.Visit(visit); err != nil {
+ t.Fatalf("error visiting %s: %s", nm, err)
+ }
+ b = getMemStats()
+ log.Printf("visit idx %d consumed %d bytes", i, b-a)
+}
+
+func BenchmarkCdbMap9List(t *testing.B) {
+ t.StopTimer()
+ indexFile, err := os.Open(testIndexFilename)
+ if err != nil {
+ t.Fatalf("cannot open %s: %s", testIndexFilename, err)
+ }
+ defer indexFile.Close()
+
+ a := getMemStats()
+ t.Logf("opening %s", indexFile)
+ idx, err := LoadNeedleMap(indexFile)
+ if err != nil {
+ t.Fatalf("cannot load %s: %s", indexFile, err)
+ }
+ defer idx.Close()
+ b := getMemStats()
+ log.Printf("LoadNeedleMap consumed %d bytes", b-a)
+
+ cdbFn := testIndexFilename + ".cdb"
+ a = getMemStats()
+ t.Logf("opening %s", cdbFn)
+ m, err := OpenCdbMap(cdbFn)
+ if err != nil {
+ t.Fatalf("error opening %s: %s", cdbFn, err)
+ }
+ defer m.Close()
+ b = getMemStats()
+ log.Printf("OpenCdbMap consumed %d bytes", b-a)
+
+ i := 0
+ log.Printf("checking whether the cdb contains every key")
+ t.StartTimer()
+ err = idx.Visit(func(nv NeedleValue) error {
+ if i > t.N || rand.Intn(10) < 9 {
+ return nil
+ }
+ i++
+ if i%1000 == 0 {
+ log.Printf("%d. %s", i, nv)
+ }
+ if nv2, ok := m.Get(uint64(nv.Key)); !ok || nv2 == nil {
+ t.Errorf("%s in index, not in cdb", nv.Key)
+ } else if nv2.Key != nv.Key {
+ t.Errorf("requested key %d from cdb, got %d", nv.Key, nv2.Key)
+ } else if nv2.Offset != nv.Offset {
+ t.Errorf("offset is %d in index, %d in cdb", nv.Offset, nv2.Offset)
+ } else if nv2.Size != nv.Size {
+ t.Errorf("size is %d in index, %d in cdb", nv.Size, nv2.Size)
+ }
+ t.SetBytes(int64(nv.Size))
+ return nil
+ })
+ t.StopTimer()
+ if err != nil {
+ t.Errorf("error visiting index: %s", err)
+ }
+
+ i = 0
+ log.Printf("checking wheter the cdb contains no stray keys")
+ t.StartTimer()
+ err = m.Visit(func(nv NeedleValue) error {
+ if i > t.N || rand.Intn(10) < 9 {
+ return nil
+ }
+ if nv2, ok := m.Get(uint64(nv.Key)); !ok || nv2 == nil {
+ t.Errorf("%s in cdb, not in index", nv.Key)
+ } else if nv2.Key != nv.Key {
+ t.Errorf("requested key %d from index, got %d", nv.Key, nv2.Key)
+ } else if nv2.Offset != nv.Offset {
+ t.Errorf("offset is %d in cdb, %d in index", nv.Offset, nv2.Offset)
+ } else if nv2.Size != nv.Size {
+ t.Errorf("size is %d in cdb, %d in index", nv.Size, nv2.Size)
+ }
+ i++
+ if i%1000 == 0 {
+ log.Printf("%d. %s", i, nv)
+ }
+ t.SetBytes(int64(nv.Size))
+ return nil
+ })
+ t.StopTimer()
+ if err != nil {
+ t.Errorf("error visiting index: %s", err)
+ }
+}
+
+var mem = new(runtime.MemStats)
+
+// returns MemStats.Alloc after a GC
+func getMemStats() int64 {
+ runtime.GC()
+ runtime.ReadMemStats(mem)
+ return int64(mem.Alloc)
+}