aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/compact_map.go182
-rw-r--r--weed/storage/compact_map_perf_test.go43
-rw-r--r--weed/storage/compact_map_test.go63
-rw-r--r--weed/storage/compress.go57
-rw-r--r--weed/storage/crc.go21
-rw-r--r--weed/storage/needle.go132
-rw-r--r--weed/storage/needle_map.go99
-rw-r--r--weed/storage/needle_read_write.go238
-rw-r--r--weed/storage/replication_type.go123
-rw-r--r--weed/storage/sample.idxbin0 -> 27140560 bytes
-rw-r--r--weed/storage/store.go204
-rw-r--r--weed/storage/volume.go274
-rw-r--r--weed/storage/volume_id.go18
-rw-r--r--weed/storage/volume_info.go13
-rw-r--r--weed/storage/volume_version.go11
15 files changed, 1478 insertions, 0 deletions
diff --git a/weed/storage/compact_map.go b/weed/storage/compact_map.go
new file mode 100644
index 000000000..0b33961c4
--- /dev/null
+++ b/weed/storage/compact_map.go
@@ -0,0 +1,182 @@
+package storage
+
+import ()
+
+type NeedleValue struct {
+ Key Key
+ Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G
+ Size uint32 "Size of the data portion"
+}
+
+const (
+ batch = 100000
+)
+
+type Key uint64
+
+type CompactSection struct {
+ values []NeedleValue
+ overflow map[Key]NeedleValue
+ start Key
+ end Key
+ counter int
+}
+
+func NewCompactSection(start Key) CompactSection {
+ return CompactSection{
+ values: make([]NeedleValue, batch),
+ overflow: make(map[Key]NeedleValue),
+ start: start,
+ }
+}
+
+//return old entry size
+func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 {
+ ret := uint32(0)
+ if key > cs.end {
+ cs.end = key
+ }
+ if i := cs.binarySearchValues(key); i >= 0 {
+ ret = cs.values[i].Size
+ //println("key", key, "old size", ret)
+ cs.values[i].Offset, cs.values[i].Size = offset, size
+ } else {
+ needOverflow := cs.counter >= batch
+ needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
+ if needOverflow {
+ //println("start", cs.start, "counter", cs.counter, "key", key)
+ if oldValue, found := cs.overflow[key]; found {
+ ret = oldValue.Size
+ }
+ cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size}
+ } else {
+ p := &cs.values[cs.counter]
+ p.Key, p.Offset, p.Size = key, offset, size
+ //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
+ cs.counter++
+ }
+ }
+ return ret
+}
+
+//return old entry size
+func (cs *CompactSection) Delete(key Key) uint32 {
+ ret := uint32(0)
+ if i := cs.binarySearchValues(key); i >= 0 {
+ if cs.values[i].Size > 0 {
+ ret = cs.values[i].Size
+ cs.values[i].Size = 0
+ }
+ }
+ if v, found := cs.overflow[key]; found {
+ delete(cs.overflow, key)
+ ret = v.Size
+ }
+ return ret
+}
+func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
+ if v, ok := cs.overflow[key]; ok {
+ return &v, true
+ }
+ if i := cs.binarySearchValues(key); i >= 0 {
+ return &cs.values[i], true
+ }
+ return nil, false
+}
+func (cs *CompactSection) binarySearchValues(key Key) int {
+ l, h := 0, cs.counter-1
+ if h >= 0 && cs.values[h].Key < key {
+ return -2
+ }
+ //println("looking for key", key)
+ for l <= h {
+ m := (l + h) / 2
+ //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
+ if cs.values[m].Key < key {
+ l = m + 1
+ } else if key < cs.values[m].Key {
+ h = m - 1
+ } else {
+ //println("found", m)
+ return m
+ }
+ }
+ return -1
+}
+
+//This map assumes mostly inserting increasing keys
+type CompactMap struct {
+ list []CompactSection
+}
+
+func NewCompactMap() CompactMap {
+ return CompactMap{}
+}
+
+func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ //println(x, "creating", len(cm.list), "section1, starting", key)
+ cm.list = append(cm.list, NewCompactSection(key))
+ x = len(cm.list) - 1
+ }
+ return cm.list[x].Set(key, offset, size)
+}
+func (cm *CompactMap) Delete(key Key) uint32 {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ return uint32(0)
+ }
+ return cm.list[x].Delete(key)
+}
+func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ return nil, false
+ }
+ return cm.list[x].Get(key)
+}
+func (cm *CompactMap) binarySearchCompactSection(key Key) int {
+ l, h := 0, len(cm.list)-1
+ if h < 0 {
+ return -5
+ }
+ if cm.list[h].start <= key {
+ if cm.list[h].counter < batch || key <= cm.list[h].end {
+ return h
+ } else {
+ return -4
+ }
+ }
+ for l <= h {
+ m := (l + h) / 2
+ if key < cm.list[m].start {
+ h = m - 1
+ } else { // cm.list[m].start <= key
+ if cm.list[m+1].start <= key {
+ l = m + 1
+ } else {
+ return m
+ }
+ }
+ }
+ return -3
+}
+
+func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
+ for _, cs := range cm.list {
+ for _, v := range cs.overflow {
+ if err := visit(v); err != nil {
+ return err
+ }
+ }
+ for _, v := range cs.values {
+ if _, found := cs.overflow[v.Key]; !found {
+ if err := visit(v); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/weed/storage/compact_map_perf_test.go b/weed/storage/compact_map_perf_test.go
new file mode 100644
index 000000000..b99356a73
--- /dev/null
+++ b/weed/storage/compact_map_perf_test.go
@@ -0,0 +1,43 @@
+package storage
+
+import (
+ "log"
+ "os"
+ "weed/util"
+ "testing"
+)
+
+func TestMemoryUsage(t *testing.T) {
+
+ indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
+ if ie != nil {
+ log.Fatalln(ie)
+ }
+ LoadNewNeedleMap(indexFile)
+
+}
+
+func LoadNewNeedleMap(file *os.File) CompactMap {
+ m := NewCompactMap()
+ bytes := make([]byte, 16*1024)
+ count, e := file.Read(bytes)
+ if count > 0 {
+ fstat, _ := file.Stat()
+ log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
+ }
+ for count > 0 && e == nil {
+ for i := 0; i < count; i += 16 {
+ key := util.BytesToUint64(bytes[i : i+8])
+ offset := util.BytesToUint32(bytes[i+8 : i+12])
+ size := util.BytesToUint32(bytes[i+12 : i+16])
+ if offset > 0 {
+ m.Set(Key(key), offset, size)
+ } else {
+ //delete(m, key)
+ }
+ }
+
+ count, e = file.Read(bytes)
+ }
+ return m
+}
diff --git a/weed/storage/compact_map_test.go b/weed/storage/compact_map_test.go
new file mode 100644
index 000000000..e76e9578d
--- /dev/null
+++ b/weed/storage/compact_map_test.go
@@ -0,0 +1,63 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestXYZ(t *testing.T) {
+ m := NewCompactMap()
+ for i := uint32(0); i < 100*batch; i += 2 {
+ m.Set(Key(i), i, i)
+ }
+
+ for i := uint32(0); i < 100*batch; i += 37 {
+ m.Delete(Key(i))
+ }
+
+ for i := uint32(0); i < 10*batch; i += 3 {
+ m.Set(Key(i), i+11, i+5)
+ }
+
+ // for i := uint32(0); i < 100; i++ {
+ // if v := m.Get(Key(i)); v != nil {
+ // println(i, "=", v.Key, v.Offset, v.Size)
+ // }
+ // }
+
+ for i := uint32(0); i < 10*batch; i++ {
+ v, ok := m.Get(Key(i))
+ if i%3 == 0 {
+ if !ok {
+ t.Fatal("key", i, "missing!")
+ }
+ if v.Size != i+5 {
+ t.Fatal("key", i, "size", v.Size)
+ }
+ } else if i%37 == 0 {
+ if ok && v.Size > 0 {
+ t.Fatal("key", i, "should have been deleted needle value", v)
+ }
+ } else if i%2 == 0 {
+ if v.Size != i {
+ t.Fatal("key", i, "size", v.Size)
+ }
+ }
+ }
+
+ for i := uint32(10 * batch); i < 100*batch; i++ {
+ v, ok := m.Get(Key(i))
+ if i%37 == 0 {
+ if ok && v.Size > 0 {
+ t.Fatal("key", i, "should have been deleted needle value", v)
+ }
+ } else if i%2 == 0 {
+ if v == nil {
+ t.Fatal("key", i, "missing")
+ }
+ if v.Size != i {
+ t.Fatal("key", i, "size", v.Size)
+ }
+ }
+ }
+
+}
diff --git a/weed/storage/compress.go b/weed/storage/compress.go
new file mode 100644
index 000000000..256789c9c
--- /dev/null
+++ b/weed/storage/compress.go
@@ -0,0 +1,57 @@
+package storage
+
+import (
+ "bytes"
+ "compress/flate"
+ "compress/gzip"
+ "io/ioutil"
+ "strings"
+)
+
+/*
+* Default more not to gzip since gzip can be done on client side.
+*/
+func IsGzippable(ext, mtype string) bool {
+ if strings.HasPrefix(mtype, "text/") {
+ return true
+ }
+ switch ext {
+ case ".zip", ".rar", ".gz", ".bz2", ".xz":
+ return false
+ case ".pdf", ".txt", ".html", ".css", ".js", ".json":
+ return true
+ }
+ if strings.HasPrefix(mtype, "application/") {
+ if strings.HasSuffix(mtype, "xml") {
+ return true
+ }
+ if strings.HasSuffix(mtype, "script") {
+ return true
+ }
+ }
+ return false
+}
+
+func GzipData(input []byte) ([]byte, error) {
+ buf := new(bytes.Buffer)
+ w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
+ if _, err := w.Write(input); err != nil {
+ println("error compressing data:", err)
+ return nil, err
+ }
+ if err := w.Close(); err != nil {
+ println("error closing compressed data:", err)
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+func UnGzipData(input []byte) ([]byte, error) {
+ buf := bytes.NewBuffer(input)
+ r, _ := gzip.NewReader(buf)
+ defer r.Close()
+ output, err := ioutil.ReadAll(r)
+ if err != nil {
+ println("error uncompressing data:", err)
+ }
+ return output, err
+}
diff --git a/weed/storage/crc.go b/weed/storage/crc.go
new file mode 100644
index 000000000..198352e68
--- /dev/null
+++ b/weed/storage/crc.go
@@ -0,0 +1,21 @@
+package storage
+
+import (
+ "hash/crc32"
+)
+
+var table = crc32.MakeTable(crc32.Castagnoli)
+
+type CRC uint32
+
+func NewCRC(b []byte) CRC {
+ return CRC(0).Update(b)
+}
+
+func (c CRC) Update(b []byte) CRC {
+ return CRC(crc32.Update(uint32(c), table, b))
+}
+
+func (c CRC) Value() uint32 {
+ return uint32(c>>15|c<<17) + 0xa282ead8
+}
diff --git a/weed/storage/needle.go b/weed/storage/needle.go
new file mode 100644
index 000000000..23016f98b
--- /dev/null
+++ b/weed/storage/needle.go
@@ -0,0 +1,132 @@
+package storage
+
+import (
+ "encoding/hex"
+ "fmt"
+ "io/ioutil"
+ "mime"
+ "net/http"
+ "path"
+ "weed/util"
+ "strconv"
+ "strings"
+)
+
+const (
+ NeedleHeaderSize = 16 //should never change this
+ NeedlePaddingSize = 8
+ NeedleChecksumSize = 4
+)
+
+type Needle struct {
+ Cookie uint32 "random number to mitigate brute force lookups"
+ Id uint64 "needle id"
+ Size uint32 "sum of DataSize,Data,NameSize,Name,MimeSize,Mime"
+
+ DataSize uint32 "Data size" //version2
+ Data []byte "The actual file data"
+ Flags byte "boolean flags" //version2
+ NameSize uint8 //version2
+ Name []byte "maximum 256 characters" //version2
+ MimeSize uint8 //version2
+ Mime []byte "maximum 256 characters" //version2
+
+ Checksum CRC "CRC32 to check integrity"
+ Padding []byte "Aligned to 8 bytes"
+}
+
+func NewNeedle(r *http.Request) (n *Needle, fname string, e error) {
+
+ n = new(Needle)
+ form, fe := r.MultipartReader()
+ if fe != nil {
+ fmt.Println("MultipartReader [ERROR]", fe)
+ e = fe
+ return
+ }
+ part, fe := form.NextPart()
+ if fe != nil {
+ fmt.Println("Reading Multi part [ERROR]", fe)
+ e = fe
+ return
+ }
+ fname = part.FileName()
+ fname = path.Base(fname)
+ data, _ := ioutil.ReadAll(part)
+ dotIndex := strings.LastIndex(fname, ".")
+ ext, mtype := "", ""
+ if dotIndex > 0 {
+ ext = fname[dotIndex:]
+ mtype = mime.TypeByExtension(ext)
+ }
+ contentType := part.Header.Get("Content-Type")
+ if contentType != "" && mtype != contentType && len(contentType) < 256 {
+ n.Mime = []byte(contentType)
+ n.SetHasMime()
+ mtype = contentType
+ }
+ if IsGzippable(ext, mtype) {
+ if data, e = GzipData(data); e != nil {
+ return
+ }
+ n.SetGzipped()
+ }
+ if ext == ".gz" {
+ n.SetGzipped()
+ }
+ if len(fname) < 256 {
+ if strings.HasSuffix(fname, ".gz") {
+ n.Name = []byte(fname[:len(fname)-3])
+ } else {
+ n.Name = []byte(fname)
+ }
+ n.SetHasName()
+ }
+
+ n.Data = data
+ n.Checksum = NewCRC(data)
+
+ commaSep := strings.LastIndex(r.URL.Path, ",")
+ dotSep := strings.LastIndex(r.URL.Path, ".")
+ fid := r.URL.Path[commaSep+1:]
+ if dotSep > 0 {
+ fid = r.URL.Path[commaSep+1 : dotSep]
+ }
+
+ n.ParsePath(fid)
+
+ return
+}
+func (n *Needle) ParsePath(fid string) {
+ length := len(fid)
+ if length <= 8 {
+ if length > 0 {
+ println("Invalid fid", fid, "length", length)
+ }
+ return
+ }
+ delta := ""
+ deltaIndex := strings.LastIndex(fid, "_")
+ if deltaIndex > 0 {
+ fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:]
+ }
+ n.Id, n.Cookie = ParseKeyHash(fid)
+ if delta != "" {
+ d, e := strconv.ParseUint(delta, 10, 64)
+ if e == nil {
+ n.Id += d
+ }
+ }
+}
+
+func ParseKeyHash(key_hash_string string) (uint64, uint32) {
+ key_hash_bytes, khe := hex.DecodeString(key_hash_string)
+ key_hash_len := len(key_hash_bytes)
+ if khe != nil || key_hash_len <= 4 {
+ println("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe)
+ return 0, 0
+ }
+ key := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4])
+ hash := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len])
+ return key, hash
+}
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
new file mode 100644
index 000000000..505dd36dd
--- /dev/null
+++ b/weed/storage/needle_map.go
@@ -0,0 +1,99 @@
+package storage
+
+import (
+ //"log"
+ "os"
+ "weed/util"
+)
+
+type NeedleMap struct {
+ indexFile *os.File
+ m CompactMap
+
+ //transient
+ bytes []byte
+
+ deletionCounter int
+ fileCounter int
+ deletionByteCounter uint64
+ fileByteCounter uint64
+}
+
+func NewNeedleMap(file *os.File) *NeedleMap {
+ nm := &NeedleMap{
+ m: NewCompactMap(),
+ bytes: make([]byte, 16),
+ indexFile: file,
+ }
+ return nm
+}
+
+const (
+ RowsToRead = 1024
+)
+
+func LoadNeedleMap(file *os.File) *NeedleMap {
+ nm := NewNeedleMap(file)
+ bytes := make([]byte, 16*RowsToRead)
+ count, e := nm.indexFile.Read(bytes)
+ for count > 0 && e == nil {
+ for i := 0; i < count; i += 16 {
+ key := util.BytesToUint64(bytes[i : i+8])
+ offset := util.BytesToUint32(bytes[i+8 : i+12])
+ size := util.BytesToUint32(bytes[i+12 : i+16])
+ nm.fileCounter++
+ nm.fileByteCounter = nm.fileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
+ }
+ } else {
+ oldSize := nm.m.Delete(Key(key))
+ //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
+ }
+ }
+
+ count, e = nm.indexFile.Read(bytes)
+ }
+ return nm
+}
+
+func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ util.Uint64toBytes(nm.bytes[0:8], key)
+ util.Uint32toBytes(nm.bytes[8:12], offset)
+ util.Uint32toBytes(nm.bytes[12:16], size)
+ nm.fileCounter++
+ nm.fileByteCounter = nm.fileByteCounter + uint64(size)
+ if oldSize > 0 {
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
+ }
+ return nm.indexFile.Write(nm.bytes)
+}
+func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ element, ok = nm.m.Get(Key(key))
+ return
+}
+func (nm *NeedleMap) Delete(key uint64) {
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
+ util.Uint64toBytes(nm.bytes[0:8], key)
+ util.Uint32toBytes(nm.bytes[8:12], 0)
+ util.Uint32toBytes(nm.bytes[12:16], 0)
+ nm.indexFile.Write(nm.bytes)
+ nm.deletionCounter++
+}
+func (nm *NeedleMap) Close() {
+ nm.indexFile.Close()
+}
+func (nm *NeedleMap) ContentSize() uint64 {
+ return nm.fileByteCounter
+}
+func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
+ return nm.m.Visit(visit)
+}
diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go
new file mode 100644
index 000000000..1dac6d5c4
--- /dev/null
+++ b/weed/storage/needle_read_write.go
@@ -0,0 +1,238 @@
+package storage
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "weed/util"
+)
+
+const (
+ FlagGzip = 0x01
+ FlagHasName = 0x02
+ FlagHasMime = 0x04
+)
+
+func (n *Needle) DiskSize() uint32 {
+ padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
+ return NeedleHeaderSize + n.Size + padding + NeedleChecksumSize
+}
+func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
+ if s, ok := w.(io.Seeker); ok {
+ if end, e := s.Seek(0, 1); e == nil {
+ defer func(s io.Seeker, off int64) {
+ if err != nil {
+ if _, e = s.Seek(off, 0); e != nil {
+ fmt.Printf("Failed to seek back to %d with error: %s\n", w, off, e)
+ }
+ }
+ }(s, end)
+ } else {
+ err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e)
+ return
+ }
+ }
+ switch version {
+ case Version1:
+ header := make([]byte, NeedleHeaderSize)
+ util.Uint32toBytes(header[0:4], n.Cookie)
+ util.Uint64toBytes(header[4:12], n.Id)
+ n.Size = uint32(len(n.Data))
+ size = n.Size
+ util.Uint32toBytes(header[12:16], n.Size)
+ if _, err = w.Write(header); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Data); err != nil {
+ return
+ }
+ padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
+ util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
+ _, err = w.Write(header[0 : NeedleChecksumSize+padding])
+ return
+ case Version2:
+ header := make([]byte, NeedleHeaderSize)
+ util.Uint32toBytes(header[0:4], n.Cookie)
+ util.Uint64toBytes(header[4:12], n.Id)
+ n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
+ if n.DataSize > 0 {
+ n.Size = 4 + n.DataSize + 1
+ if n.HasName() {
+ n.Size = n.Size + 1 + uint32(n.NameSize)
+ }
+ if n.HasMime() {
+ n.Size = n.Size + 1 + uint32(n.MimeSize)
+ }
+ }
+ size = n.DataSize
+ util.Uint32toBytes(header[12:16], n.Size)
+ if _, err = w.Write(header); err != nil {
+ return
+ }
+ if n.DataSize > 0 {
+ util.Uint32toBytes(header[0:4], n.DataSize)
+ if _, err = w.Write(header[0:4]); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Data); err != nil {
+ return
+ }
+ util.Uint8toBytes(header[0:1], n.Flags)
+ if _, err = w.Write(header[0:1]); err != nil {
+ return
+ }
+ }
+ if n.HasName() {
+ util.Uint8toBytes(header[0:1], n.NameSize)
+ if _, err = w.Write(header[0:1]); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Name); err != nil {
+ return
+ }
+ }
+ if n.HasMime() {
+ util.Uint8toBytes(header[0:1], n.MimeSize)
+ if _, err = w.Write(header[0:1]); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Mime); err != nil {
+ return
+ }
+ }
+ padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
+ util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
+ _, err = w.Write(header[0 : NeedleChecksumSize+padding])
+ return n.DataSize, err
+ }
+ return 0, fmt.Errorf("Unsupported Version! (%d)", version)
+}
+
+func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err error) {
+ switch version {
+ case Version1:
+ bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
+ if ret, err = r.Read(bytes); err != nil {
+ return
+ }
+ n.readNeedleHeader(bytes)
+ n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
+ checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
+ if checksum != NewCRC(n.Data).Value() {
+ return 0, errors.New("CRC error! Data On Disk Corrupted!")
+ }
+ return
+ case Version2:
+ if size == 0 {
+ return 0, nil
+ }
+ bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
+ if ret, err = r.Read(bytes); err != nil {
+ return
+ }
+ if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) {
+ return 0, errors.New("File Entry Not Found!")
+ }
+ n.readNeedleHeader(bytes)
+ if n.Size != size {
+ return 0, fmt.Errorf("File Entry Not Found! Needle %d Memory %d", n.Size, size)
+ }
+ n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
+ checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize])
+ if checksum != NewCRC(n.Data).Value() {
+ return 0, errors.New("CRC error! Data On Disk Corrupted!")
+ }
+ return
+ }
+ return 0, fmt.Errorf("Unsupported Version! (%d)", version)
+}
+func (n *Needle) readNeedleHeader(bytes []byte) {
+ n.Cookie = util.BytesToUint32(bytes[0:4])
+ n.Id = util.BytesToUint64(bytes[4:12])
+ n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
+}
+func (n *Needle) readNeedleDataVersion2(bytes []byte) {
+ index, lenBytes := 0, len(bytes)
+ if index < lenBytes {
+ n.DataSize = util.BytesToUint32(bytes[index : index+4])
+ index = index + 4
+ n.Data = bytes[index : index+int(n.DataSize)]
+ index = index + int(n.DataSize)
+ n.Flags = bytes[index]
+ index = index + 1
+ }
+ if index < lenBytes && n.HasName() {
+ n.NameSize = uint8(bytes[index])
+ index = index + 1
+ n.Name = bytes[index : index+int(n.NameSize)]
+ index = index + int(n.NameSize)
+ }
+ if index < lenBytes && n.HasMime() {
+ n.MimeSize = uint8(bytes[index])
+ index = index + 1
+ n.Mime = bytes[index : index+int(n.MimeSize)]
+ }
+}
+
+func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32, err error) {
+ n = new(Needle)
+ if version == Version1 || version == Version2 {
+ bytes := make([]byte, NeedleHeaderSize)
+ var count int
+ count, err = r.Read(bytes)
+ if count <= 0 || err != nil {
+ return nil, 0, err
+ }
+ n.readNeedleHeader(bytes)
+ padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
+ bodyLength = n.Size + NeedleChecksumSize + padding
+ }
+ return
+}
+
+//n should be a needle already read the header
+//the input stream will read until next file entry
+func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) (err error) {
+ if bodyLength <= 0 {
+ return nil
+ }
+ switch version {
+ case Version1:
+ bytes := make([]byte, bodyLength)
+ if _, err = r.Read(bytes); err != nil {
+ return
+ }
+ n.Data = bytes[:n.Size]
+ n.Checksum = NewCRC(n.Data)
+ case Version2:
+ bytes := make([]byte, bodyLength)
+ if _, err = r.Read(bytes); err != nil {
+ return
+ }
+ n.readNeedleDataVersion2(bytes[0:n.Size])
+ n.Checksum = NewCRC(n.Data)
+ default:
+ err = fmt.Errorf("Unsupported Version! (%d)", version)
+ }
+ return
+}
+
+func (n *Needle) IsGzipped() bool {
+ return n.Flags&FlagGzip > 0
+}
+func (n *Needle) SetGzipped() {
+ n.Flags = n.Flags | FlagGzip
+}
+func (n *Needle) HasName() bool {
+ return n.Flags&FlagHasName > 0
+}
+func (n *Needle) SetHasName() {
+ n.Flags = n.Flags | FlagHasName
+}
+func (n *Needle) HasMime() bool {
+ return n.Flags&FlagHasMime > 0
+}
+func (n *Needle) SetHasMime() {
+ n.Flags = n.Flags | FlagHasMime
+}
diff --git a/weed/storage/replication_type.go b/weed/storage/replication_type.go
new file mode 100644
index 000000000..0902d1016
--- /dev/null
+++ b/weed/storage/replication_type.go
@@ -0,0 +1,123 @@
+package storage
+
+import (
+ "errors"
+)
+
+type ReplicationType string
+
+const (
+ Copy000 = ReplicationType("000") // single copy
+ Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
+ Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
+ Copy100 = ReplicationType("100") // 2 copies, each on different data center
+ Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
+ Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
+ LengthRelicationType = 6
+ CopyNil = ReplicationType(255) // nil value
+)
+
+func NewReplicationTypeFromString(t string) (ReplicationType, error) {
+ switch t {
+ case "000":
+ return Copy000, nil
+ case "001":
+ return Copy001, nil
+ case "010":
+ return Copy010, nil
+ case "100":
+ return Copy100, nil
+ case "110":
+ return Copy110, nil
+ case "200":
+ return Copy200, nil
+ }
+ return Copy000, errors.New("Unknown Replication Type:" + t)
+}
+func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
+ switch b {
+ case byte(000):
+ return Copy000, nil
+ case byte(001):
+ return Copy001, nil
+ case byte(010):
+ return Copy010, nil
+ case byte(100):
+ return Copy100, nil
+ case byte(110):
+ return Copy110, nil
+ case byte(200):
+ return Copy200, nil
+ }
+ return Copy000, errors.New("Unknown Replication Type:" + string(b))
+}
+
+func (r *ReplicationType) String() string {
+ switch *r {
+ case Copy000:
+ return "000"
+ case Copy001:
+ return "001"
+ case Copy010:
+ return "010"
+ case Copy100:
+ return "100"
+ case Copy110:
+ return "110"
+ case Copy200:
+ return "200"
+ }
+ return "000"
+}
+func (r *ReplicationType) Byte() byte {
+ switch *r {
+ case Copy000:
+ return byte(000)
+ case Copy001:
+ return byte(001)
+ case Copy010:
+ return byte(010)
+ case Copy100:
+ return byte(100)
+ case Copy110:
+ return byte(110)
+ case Copy200:
+ return byte(200)
+ }
+ return byte(000)
+}
+
+func (repType ReplicationType) GetReplicationLevelIndex() int {
+ switch repType {
+ case Copy000:
+ return 0
+ case Copy001:
+ return 1
+ case Copy010:
+ return 2
+ case Copy100:
+ return 3
+ case Copy110:
+ return 4
+ case Copy200:
+ return 5
+ }
+ return -1
+}
+func (repType ReplicationType) GetCopyCount() int {
+ switch repType {
+ case Copy000:
+ return 1
+ case Copy001:
+ return 2
+ case Copy010:
+ return 2
+ case Copy100:
+ return 2
+ case Copy110:
+ return 3
+ case Copy200:
+ return 3
+ }
+ return 0
+}
diff --git a/weed/storage/sample.idx b/weed/storage/sample.idx
new file mode 100644
index 000000000..44918b41d
--- /dev/null
+++ b/weed/storage/sample.idx
Binary files differ
diff --git a/weed/storage/store.go b/weed/storage/store.go
new file mode 100644
index 000000000..b016a6491
--- /dev/null
+++ b/weed/storage/store.go
@@ -0,0 +1,204 @@
+package storage
+
+import (
+ "encoding/json"
+ "errors"
+ "io/ioutil"
+ "log"
+ "net/url"
+ "weed/util"
+ "strconv"
+ "strings"
+)
+
+type Store struct {
+ volumes map[VolumeId]*Volume
+ dir string
+ Port int
+ Ip string
+ PublicUrl string
+ MaxVolumeCount int
+
+ masterNode string
+ connected bool
+ volumeSizeLimit uint64 //read from the master
+
+}
+
+func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
+ s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount}
+ s.volumes = make(map[VolumeId]*Volume)
+ s.loadExistingVolumes()
+
+ log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
+ return
+}
+func (s *Store) AddVolume(volumeListString string, replicationType string) error {
+ rt, e := NewReplicationTypeFromString(replicationType)
+ if e != nil {
+ return e
+ }
+ for _, range_string := range strings.Split(volumeListString, ",") {
+ if strings.Index(range_string, "-") < 0 {
+ id_string := range_string
+ id, err := NewVolumeId(id_string)
+ if err != nil {
+ return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
+ }
+ e = s.addVolume(VolumeId(id), rt)
+ } else {
+ pair := strings.Split(range_string, "-")
+ start, start_err := strconv.ParseUint(pair[0], 10, 64)
+ if start_err != nil {
+ return errors.New("Volume Start Id" + pair[0] + " is not a valid unsigned integer!")
+ }
+ end, end_err := strconv.ParseUint(pair[1], 10, 64)
+ if end_err != nil {
+ return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!")
+ }
+ for id := start; id <= end; id++ {
+ if err := s.addVolume(VolumeId(id), rt); err != nil {
+ e = err
+ }
+ }
+ }
+ }
+ return e
+}
+func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
+ if s.volumes[vid] != nil {
+ return errors.New("Volume Id " + vid.String() + " already exists!")
+ }
+ log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
+ s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
+ return err
+}
+
+func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false
+ }
+ garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
+ if e != nil {
+ return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false
+ }
+ return nil, garbageThreshold < s.volumes[vid].garbageLevel()
+}
+func (s *Store) CompactVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
+ }
+ return s.volumes[vid].compact()
+}
+func (s *Store) CommitCompactVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
+ }
+ return s.volumes[vid].commitCompact()
+}
+func (s *Store) loadExistingVolumes() {
+ if dirs, err := ioutil.ReadDir(s.dir); err == nil {
+ for _, dir := range dirs {
+ name := dir.Name()
+ if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
+ base := name[:len(name)-len(".dat")]
+ if vid, err := NewVolumeId(base); err == nil {
+ if s.volumes[vid] == nil {
+ if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
+ s.volumes[vid] = v
+ log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
+ }
+ }
+ }
+ }
+ }
+ }
+}
+func (s *Store) Status() []*VolumeInfo {
+ var stats []*VolumeInfo
+ for k, v := range s.volumes {
+ s := new(VolumeInfo)
+ s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
+ VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ stats = append(stats, s)
+ }
+ return stats
+}
+
+type JoinResult struct {
+ VolumeSizeLimit uint64
+}
+
+func (s *Store) SetMaster(mserver string) {
+ s.masterNode = mserver
+}
+func (s *Store) Join() error {
+ stats := new([]*VolumeInfo)
+ for k, v := range s.volumes {
+ s := new(VolumeInfo)
+ s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
+ VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ *stats = append(*stats, s)
+ }
+ bytes, _ := json.Marshal(stats)
+ values := make(url.Values)
+ if !s.connected {
+ values.Add("init", "true")
+ }
+ values.Add("port", strconv.Itoa(s.Port))
+ values.Add("ip", s.Ip)
+ values.Add("publicUrl", s.PublicUrl)
+ values.Add("volumes", string(bytes))
+ values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
+ jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
+ if err != nil {
+ return err
+ }
+ var ret JoinResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ s.volumeSizeLimit = ret.VolumeSizeLimit
+ s.connected = true
+ return nil
+}
+func (s *Store) Close() {
+ for _, v := range s.volumes {
+ v.Close()
+ }
+}
+func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
+ if v := s.volumes[i]; v != nil {
+ size, err = v.write(n)
+ if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
+ log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
+ s.Join()
+ }
+ return
+ }
+ log.Println("volume", i, "not found!")
+ return
+}
+func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
+ if v := s.volumes[i]; v != nil {
+ return v.delete(n)
+ }
+ return 0, nil
+}
+func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
+ if v := s.volumes[i]; v != nil {
+ return v.read(n)
+ }
+ return 0, errors.New("Not Found")
+}
+func (s *Store) GetVolume(i VolumeId) *Volume {
+ return s.volumes[i]
+}
+
+func (s *Store) HasVolume(i VolumeId) bool {
+ _, ok := s.volumes[i]
+ return ok
+}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
new file mode 100644
index 000000000..707c6e6f8
--- /dev/null
+++ b/weed/storage/volume.go
@@ -0,0 +1,274 @@
+package storage
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path"
+ "sync"
+)
+
+const (
+ SuperBlockSize = 8
+)
+
+type SuperBlock struct {
+ Version Version
+ ReplicaType ReplicationType
+}
+
+func (s *SuperBlock) Bytes() []byte {
+ header := make([]byte, SuperBlockSize)
+ header[0] = byte(s.Version)
+ header[1] = s.ReplicaType.Byte()
+ return header
+}
+
+type Volume struct {
+ Id VolumeId
+ dir string
+ dataFile *os.File
+ nm *NeedleMap
+
+ SuperBlock
+
+ accessLock sync.Mutex
+}
+
+func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
+ v = &Volume{dir: dirname, Id: id}
+ v.SuperBlock = SuperBlock{ReplicaType: replicationType}
+ e = v.load(true)
+ return
+}
+func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) {
+ v = &Volume{dir: dirname, Id: id}
+ v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
+ e = v.load(false)
+ return
+}
+func (v *Volume) load(alsoLoadIndex bool) error {
+ var e error
+ fileName := path.Join(v.dir, v.Id.String())
+ v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ if e != nil {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ }
+ if v.ReplicaType == CopyNil {
+ if e = v.readSuperBlock(); e != nil {
+ return e
+ }
+ } else {
+ v.maybeWriteSuperBlock()
+ }
+ if alsoLoadIndex {
+ indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
+ if ie != nil {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ }
+ v.nm = LoadNeedleMap(indexFile)
+ }
+ return nil
+}
+func (v *Volume) Version() Version {
+ return v.SuperBlock.Version
+}
+func (v *Volume) Size() int64 {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ stat, e := v.dataFile.Stat()
+ if e == nil {
+ return stat.Size()
+ }
+ fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
+ return -1
+}
+func (v *Volume) Close() {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ v.nm.Close()
+ v.dataFile.Close()
+}
+func (v *Volume) maybeWriteSuperBlock() {
+ stat, e := v.dataFile.Stat()
+ if e != nil {
+ fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e)
+ return
+ }
+ if stat.Size() == 0 {
+ v.SuperBlock.Version = CurrentVersion
+ v.dataFile.Write(v.SuperBlock.Bytes())
+ }
+}
+func (v *Volume) readSuperBlock() (err error) {
+ v.dataFile.Seek(0, 0)
+ header := make([]byte, SuperBlockSize)
+ if _, e := v.dataFile.Read(header); e != nil {
+ return fmt.Errorf("cannot read superblock: %s", e)
+ }
+ v.SuperBlock, err = ParseSuperBlock(header)
+ return err
+}
+func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
+ superBlock.Version = Version(header[0])
+ if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
+ err = fmt.Errorf("cannot read replica type: %s", err)
+ }
+ return
+}
+func (v *Volume) NeedToReplicate() bool {
+ return v.ReplicaType.GetCopyCount() > 1
+}
+
+func (v *Volume) write(n *Needle) (size uint32, err error) {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ var offset int64
+ if offset, err = v.dataFile.Seek(0, 2); err != nil {
+ return
+ }
+ if size, err = n.Append(v.dataFile, v.Version()); err != nil {
+ return
+ }
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
+ _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size)
+ }
+ return
+}
+func (v *Volume) delete(n *Needle) (uint32, error) {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ nv, ok := v.nm.Get(n.Id)
+ //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
+ if ok {
+ v.nm.Delete(n.Id)
+ v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0)
+ _, err := n.Append(v.dataFile, v.Version())
+ return nv.Size, err
+ }
+ return 0, nil
+}
+
+func (v *Volume) read(n *Needle) (int, error) {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ nv, ok := v.nm.Get(n.Id)
+ if ok && nv.Offset > 0 {
+ v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
+ return n.Read(v.dataFile, nv.Size, v.Version())
+ }
+ return -1, errors.New("Not Found")
+}
+
+func (v *Volume) garbageLevel() float64 {
+ return float64(v.nm.deletionByteCounter) / float64(v.ContentSize())
+}
+
+func (v *Volume) compact() error {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+
+ filePath := path.Join(v.dir, v.Id.String())
+ return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
+}
+func (v *Volume) commitCompact() error {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ v.dataFile.Close()
+ var e error
+ if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
+ return e
+ }
+ if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
+ return e
+ }
+ if e = v.load(true); e != nil {
+ return e
+ }
+ return nil
+}
+
+func ScanVolumeFile(dirname string, id VolumeId,
+ visitSuperBlock func(SuperBlock) error,
+ visitNeedle func(n *Needle, offset uint32) error) (err error) {
+ var v *Volume
+ if v, err = LoadVolumeOnly(dirname, id); err != nil {
+ return
+ }
+ if err = visitSuperBlock(v.SuperBlock); err != nil {
+ return
+ }
+
+ version := v.Version()
+
+ offset := uint32(SuperBlockSize)
+ n, rest, e := ReadNeedleHeader(v.dataFile, version)
+ if e != nil {
+ err = fmt.Errorf("cannot read needle header: %s", e)
+ return
+ }
+ for n != nil {
+ if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
+ err = fmt.Errorf("cannot read needle body: %s", err)
+ return
+ }
+ if err = visitNeedle(n, offset); err != nil {
+ return
+ }
+ offset += NeedleHeaderSize + rest
+ if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read needle header: %s", err)
+ }
+ }
+
+ return
+}
+
+func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
+ var (
+ dst, idx *os.File
+ )
+ if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
+ return
+ }
+ defer dst.Close()
+
+ if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
+ return
+ }
+ defer idx.Close()
+
+ nm := NewNeedleMap(idx)
+ new_offset := uint32(SuperBlockSize)
+
+ err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
+ _, err = dst.Write(superBlock.Bytes())
+ return err
+ }, func(n *Needle, offset uint32) error {
+ nv, ok := v.nm.Get(n.Id)
+ //log.Println("file size is", n.Size, "rest", rest)
+ if ok && nv.Offset*NeedlePaddingSize == offset {
+ if nv.Size > 0 {
+ if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
+ return fmt.Errorf("cannot put needle: %s", err)
+ }
+ if _, err = n.Append(dst, v.Version()); err != nil {
+ return fmt.Errorf("cannot append needle: %s", err)
+ }
+ new_offset += n.DiskSize()
+ //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
+ }
+ }
+ return nil
+ })
+
+ return
+}
+func (v *Volume) ContentSize() uint64 {
+ return v.nm.fileByteCounter
+}
diff --git a/weed/storage/volume_id.go b/weed/storage/volume_id.go
new file mode 100644
index 000000000..0333c6cf0
--- /dev/null
+++ b/weed/storage/volume_id.go
@@ -0,0 +1,18 @@
+package storage
+
+import (
+ "strconv"
+)
+
+type VolumeId uint32
+
+func NewVolumeId(vid string) (VolumeId, error) {
+ volumeId, err := strconv.ParseUint(vid, 10, 64)
+ return VolumeId(volumeId), err
+}
+func (vid *VolumeId) String() string {
+ return strconv.FormatUint(uint64(*vid), 10)
+}
+func (vid *VolumeId) Next() VolumeId {
+ return VolumeId(uint32(*vid) + 1)
+}
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
new file mode 100644
index 000000000..e4c5f6ec4
--- /dev/null
+++ b/weed/storage/volume_info.go
@@ -0,0 +1,13 @@
+package storage
+
+import ()
+
+type VolumeInfo struct {
+ Id VolumeId
+ Size uint64
+ RepType ReplicationType
+ Version Version
+ FileCount int
+ DeleteCount int
+ DeletedByteCount uint64
+}
diff --git a/weed/storage/volume_version.go b/weed/storage/volume_version.go
new file mode 100644
index 000000000..9702ae904
--- /dev/null
+++ b/weed/storage/volume_version.go
@@ -0,0 +1,11 @@
+package storage
+
+import ()
+
+type Version uint8
+
+const (
+ Version1 = Version(1)
+ Version2 = Version(2)
+ CurrentVersion = Version2
+)