aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/compact_map.go207
-rw-r--r--weed/storage/compact_map_perf_test.go45
-rw-r--r--weed/storage/compact_map_test.go77
-rw-r--r--weed/storage/crc.go30
-rw-r--r--weed/storage/disk_location.go73
-rw-r--r--weed/storage/file_id.go43
-rw-r--r--weed/storage/needle.go231
-rw-r--r--weed/storage/needle_byte_cache.go75
-rw-r--r--weed/storage/needle_map.go123
-rw-r--r--weed/storage/needle_map_boltdb.go165
-rw-r--r--weed/storage/needle_map_leveldb.go134
-rw-r--r--weed/storage/needle_map_memory.go106
-rw-r--r--weed/storage/needle_read_write.go291
-rw-r--r--weed/storage/needle_test.go45
-rw-r--r--weed/storage/replica_placement.go53
-rw-r--r--weed/storage/replica_placement_test.go14
-rw-r--r--weed/storage/store.go340
-rw-r--r--weed/storage/store_vacuum.go44
-rw-r--r--weed/storage/volume.go430
-rw-r--r--weed/storage/volume_id.go18
-rw-r--r--weed/storage/volume_info.go65
-rw-r--r--weed/storage/volume_info_test.go23
-rw-r--r--weed/storage/volume_super_block.go81
-rw-r--r--weed/storage/volume_super_block_test.go23
-rw-r--r--weed/storage/volume_sync.go213
-rw-r--r--weed/storage/volume_ttl.go135
-rw-r--r--weed/storage/volume_ttl_test.go60
-rw-r--r--weed/storage/volume_vacuum.go93
-rw-r--r--weed/storage/volume_version.go9
29 files changed, 3246 insertions, 0 deletions
diff --git a/weed/storage/compact_map.go b/weed/storage/compact_map.go
new file mode 100644
index 000000000..d4438d044
--- /dev/null
+++ b/weed/storage/compact_map.go
@@ -0,0 +1,207 @@
+package storage
+
+import (
+ "strconv"
+ "sync"
+)
+
+type NeedleValue struct {
+ Key Key
+ Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
+ Size uint32 `comment:"Size of the data portion"`
+}
+
+const (
+ batch = 100000
+)
+
+type Key uint64
+
+func (k Key) String() string {
+ return strconv.FormatUint(uint64(k), 10)
+}
+
+type CompactSection struct {
+ sync.RWMutex
+ values []NeedleValue
+ overflow map[Key]NeedleValue
+ start Key
+ end Key
+ counter int
+}
+
+func NewCompactSection(start Key) *CompactSection {
+ return &CompactSection{
+ values: make([]NeedleValue, batch),
+ overflow: make(map[Key]NeedleValue),
+ start: start,
+ }
+}
+
+//return old entry size
+func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 {
+ ret := uint32(0)
+ if key > cs.end {
+ cs.end = key
+ }
+ cs.Lock()
+ if i := cs.binarySearchValues(key); i >= 0 {
+ ret = cs.values[i].Size
+ //println("key", key, "old size", ret)
+ cs.values[i].Offset, cs.values[i].Size = offset, size
+ } else {
+ needOverflow := cs.counter >= batch
+ needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
+ if needOverflow {
+ //println("start", cs.start, "counter", cs.counter, "key", key)
+ if oldValue, found := cs.overflow[key]; found {
+ ret = oldValue.Size
+ }
+ cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size}
+ } else {
+ p := &cs.values[cs.counter]
+ p.Key, p.Offset, p.Size = key, offset, size
+ //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
+ cs.counter++
+ }
+ }
+ cs.Unlock()
+ return ret
+}
+
+//return old entry size
+func (cs *CompactSection) Delete(key Key) uint32 {
+ cs.Lock()
+ ret := uint32(0)
+ if i := cs.binarySearchValues(key); i >= 0 {
+ if cs.values[i].Size > 0 {
+ ret = cs.values[i].Size
+ cs.values[i].Size = 0
+ }
+ }
+ if v, found := cs.overflow[key]; found {
+ delete(cs.overflow, key)
+ ret = v.Size
+ }
+ cs.Unlock()
+ return ret
+}
+func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
+ cs.RLock()
+ if v, ok := cs.overflow[key]; ok {
+ cs.RUnlock()
+ return &v, true
+ }
+ if i := cs.binarySearchValues(key); i >= 0 {
+ cs.RUnlock()
+ return &cs.values[i], true
+ }
+ cs.RUnlock()
+ return nil, false
+}
+func (cs *CompactSection) binarySearchValues(key Key) int {
+ l, h := 0, cs.counter-1
+ if h >= 0 && cs.values[h].Key < key {
+ return -2
+ }
+ //println("looking for key", key)
+ for l <= h {
+ m := (l + h) / 2
+ //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
+ if cs.values[m].Key < key {
+ l = m + 1
+ } else if key < cs.values[m].Key {
+ h = m - 1
+ } else {
+ //println("found", m)
+ return m
+ }
+ }
+ return -1
+}
+
+//This map assumes mostly inserting increasing keys
+type CompactMap struct {
+ list []*CompactSection
+}
+
+func NewCompactMap() CompactMap {
+ return CompactMap{}
+}
+
+func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ //println(x, "creating", len(cm.list), "section, starting", key)
+ cm.list = append(cm.list, NewCompactSection(key))
+ x = len(cm.list) - 1
+ //keep compact section sorted by start
+ for x > 0 {
+ if cm.list[x-1].start > cm.list[x].start {
+ cm.list[x-1], cm.list[x] = cm.list[x], cm.list[x-1]
+ x = x - 1
+ } else {
+ break
+ }
+ }
+ }
+ return cm.list[x].Set(key, offset, size)
+}
+func (cm *CompactMap) Delete(key Key) uint32 {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ return uint32(0)
+ }
+ return cm.list[x].Delete(key)
+}
+func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) {
+ x := cm.binarySearchCompactSection(key)
+ if x < 0 {
+ return nil, false
+ }
+ return cm.list[x].Get(key)
+}
+func (cm *CompactMap) binarySearchCompactSection(key Key) int {
+ l, h := 0, len(cm.list)-1
+ if h < 0 {
+ return -5
+ }
+ if cm.list[h].start <= key {
+ if cm.list[h].counter < batch || key <= cm.list[h].end {
+ return h
+ }
+ return -4
+ }
+ for l <= h {
+ m := (l + h) / 2
+ if key < cm.list[m].start {
+ h = m - 1
+ } else { // cm.list[m].start <= key
+ if cm.list[m+1].start <= key {
+ l = m + 1
+ } else {
+ return m
+ }
+ }
+ }
+ return -3
+}
+
+// Visit visits all entries or stop if any error when visiting
+func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
+ for _, cs := range cm.list {
+ for _, v := range cs.overflow {
+ if err := visit(v); err != nil {
+ return err
+ }
+ }
+ for _, v := range cs.values {
+ if _, found := cs.overflow[v.Key]; !found {
+ if err := visit(v); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/weed/storage/compact_map_perf_test.go b/weed/storage/compact_map_perf_test.go
new file mode 100644
index 000000000..cc7669139
--- /dev/null
+++ b/weed/storage/compact_map_perf_test.go
@@ -0,0 +1,45 @@
+package storage
+
+import (
+ "log"
+ "os"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestMemoryUsage(t *testing.T) {
+
+ indexFile, ie := os.OpenFile("../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
+ if ie != nil {
+ log.Fatalln(ie)
+ }
+ LoadNewNeedleMap(indexFile)
+
+}
+
+func LoadNewNeedleMap(file *os.File) CompactMap {
+ m := NewCompactMap()
+ bytes := make([]byte, 16*1024)
+ count, e := file.Read(bytes)
+ if count > 0 {
+ fstat, _ := file.Stat()
+ glog.V(0).Infoln("Loading index file", fstat.Name(), "size", fstat.Size())
+ }
+ for count > 0 && e == nil {
+ for i := 0; i < count; i += 16 {
+ key := util.BytesToUint64(bytes[i : i+8])
+ offset := util.BytesToUint32(bytes[i+8 : i+12])
+ size := util.BytesToUint32(bytes[i+12 : i+16])
+ if offset > 0 {
+ m.Set(Key(key), offset, size)
+ } else {
+ //delete(m, key)
+ }
+ }
+
+ count, e = file.Read(bytes)
+ }
+ return m
+}
diff --git a/weed/storage/compact_map_test.go b/weed/storage/compact_map_test.go
new file mode 100644
index 000000000..1ccb48edb
--- /dev/null
+++ b/weed/storage/compact_map_test.go
@@ -0,0 +1,77 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestIssue52(t *testing.T) {
+ m := NewCompactMap()
+ m.Set(Key(10002), 10002, 10002)
+ if element, ok := m.Get(Key(10002)); ok {
+ println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size)
+ }
+ m.Set(Key(10001), 10001, 10001)
+ if element, ok := m.Get(Key(10002)); ok {
+ println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size)
+ } else {
+ t.Fatal("key 10002 missing after setting 10001")
+ }
+}
+
+func TestXYZ(t *testing.T) {
+ m := NewCompactMap()
+ for i := uint32(0); i < 100*batch; i += 2 {
+ m.Set(Key(i), i, i)
+ }
+
+ for i := uint32(0); i < 100*batch; i += 37 {
+ m.Delete(Key(i))
+ }
+
+ for i := uint32(0); i < 10*batch; i += 3 {
+ m.Set(Key(i), i+11, i+5)
+ }
+
+ // for i := uint32(0); i < 100; i++ {
+ // if v := m.Get(Key(i)); v != nil {
+ // glog.V(4).Infoln(i, "=", v.Key, v.Offset, v.Size)
+ // }
+ // }
+
+ for i := uint32(0); i < 10*batch; i++ {
+ v, ok := m.Get(Key(i))
+ if i%3 == 0 {
+ if !ok {
+ t.Fatal("key", i, "missing!")
+ }
+ if v.Size != i+5 {
+ t.Fatal("key", i, "size", v.Size)
+ }
+ } else if i%37 == 0 {
+ if ok && v.Size > 0 {
+ t.Fatal("key", i, "should have been deleted needle value", v)
+ }
+ } else if i%2 == 0 {
+ if v.Size != i {
+ t.Fatal("key", i, "size", v.Size)
+ }
+ }
+ }
+
+ for i := uint32(10 * batch); i < 100*batch; i++ {
+ v, ok := m.Get(Key(i))
+ if i%37 == 0 {
+ if ok && v.Size > 0 {
+ t.Fatal("key", i, "should have been deleted needle value", v)
+ }
+ } else if i%2 == 0 {
+ if v == nil {
+ t.Fatal("key", i, "missing")
+ }
+ if v.Size != i {
+ t.Fatal("key", i, "size", v.Size)
+ }
+ }
+ }
+
+}
diff --git a/weed/storage/crc.go b/weed/storage/crc.go
new file mode 100644
index 000000000..494937784
--- /dev/null
+++ b/weed/storage/crc.go
@@ -0,0 +1,30 @@
+package storage
+
+import (
+ "fmt"
+ "github.com/klauspost/crc32"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var table = crc32.MakeTable(crc32.Castagnoli)
+
+type CRC uint32
+
+func NewCRC(b []byte) CRC {
+ return CRC(0).Update(b)
+}
+
+func (c CRC) Update(b []byte) CRC {
+ return CRC(crc32.Update(uint32(c), table, b))
+}
+
+func (c CRC) Value() uint32 {
+ return uint32(c>>15|c<<17) + 0xa282ead8
+}
+
+func (n *Needle) Etag() string {
+ bits := make([]byte, 4)
+ util.Uint32toBytes(bits, uint32(n.Checksum))
+ return fmt.Sprintf("\"%x\"", bits)
+}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
new file mode 100644
index 000000000..cc3c83b63
--- /dev/null
+++ b/weed/storage/disk_location.go
@@ -0,0 +1,73 @@
+package storage
+
+import (
+ "io/ioutil"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type DiskLocation struct {
+ Directory string
+ MaxVolumeCount int
+ volumes map[VolumeId]*Volume
+}
+
+func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
+ location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
+ location.volumes = make(map[VolumeId]*Volume)
+ return location
+}
+
+func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
+
+ if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
+ for _, dir := range dirs {
+ name := dir.Name()
+ if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
+ collection := ""
+ base := name[:len(name)-len(".dat")]
+ i := strings.LastIndex(base, "_")
+ if i > 0 {
+ collection, base = base[0:i], base[i+1:]
+ }
+ if vid, err := NewVolumeId(base); err == nil {
+ if l.volumes[vid] == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
+ l.volumes[vid] = v
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
+ } else {
+ glog.V(0).Infof("new volume %s error %s", name, e)
+ }
+ }
+ }
+ }
+ }
+ }
+ glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
+}
+
+func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
+ for k, v := range l.volumes {
+ if v.Collection == collection {
+ e = l.deleteVolumeById(k)
+ if e != nil {
+ return
+ }
+ }
+ }
+ return
+}
+
+func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
+ v, ok := l.volumes[vid]
+ if !ok {
+ return
+ }
+ e = v.Destroy()
+ if e != nil {
+ return
+ }
+ delete(l.volumes, vid)
+ return
+}
diff --git a/weed/storage/file_id.go b/weed/storage/file_id.go
new file mode 100644
index 000000000..4cfdb16fa
--- /dev/null
+++ b/weed/storage/file_id.go
@@ -0,0 +1,43 @@
+package storage
+
+import (
+ "encoding/hex"
+ "errors"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type FileId struct {
+ VolumeId VolumeId
+ Key uint64
+ Hashcode uint32
+}
+
+func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId {
+ return &FileId{VolumeId: VolumeId, Key: n.Id, Hashcode: n.Cookie}
+}
+func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId {
+ return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
+}
+func ParseFileId(fid string) (*FileId, error) {
+ a := strings.Split(fid, ",")
+ if len(a) != 2 {
+ glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a))
+ return nil, errors.New("Invalid fid " + fid)
+ }
+ vid_string, key_hash_string := a[0], a[1]
+ volumeId, _ := NewVolumeId(vid_string)
+ key, hash, e := ParseKeyHash(key_hash_string)
+ return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e
+}
+func (n *FileId) String() string {
+ bytes := make([]byte, 12)
+ util.Uint64toBytes(bytes[0:8], n.Key)
+ util.Uint32toBytes(bytes[8:12], n.Hashcode)
+ nonzero_index := 0
+ for ; bytes[nonzero_index] == 0; nonzero_index++ {
+ }
+ return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:])
+}
diff --git a/weed/storage/needle.go b/weed/storage/needle.go
new file mode 100644
index 000000000..29549b323
--- /dev/null
+++ b/weed/storage/needle.go
@@ -0,0 +1,231 @@
+package storage
+
+import (
+ "fmt"
+ "io/ioutil"
+ "mime"
+ "net/http"
+ "path"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/images"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+)
+
+const (
+ NeedleHeaderSize = 16 //should never change this
+ NeedlePaddingSize = 8
+ NeedleChecksumSize = 4
+ MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
+)
+
+/*
+* A Needle means a uploaded and stored file.
+* Needle file size is limited to 4GB for now.
+ */
+type Needle struct {
+ Cookie uint32 `comment:"random number to mitigate brute force lookups"`
+ Id uint64 `comment:"needle id"`
+ Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
+
+ DataSize uint32 `comment:"Data size"` //version2
+ Data []byte `comment:"The actual file data"`
+ Flags byte `comment:"boolean flags"` //version2
+ NameSize uint8 //version2
+ Name []byte `comment:"maximum 256 characters"` //version2
+ MimeSize uint8 //version2
+ Mime []byte `comment:"maximum 256 characters"` //version2
+ LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
+ Ttl *TTL
+
+ Checksum CRC `comment:"CRC32 to check integrity"`
+ Padding []byte `comment:"Aligned to 8 bytes"`
+
+ rawBlock *Block // underlying supporing []byte, fetched and released into a pool
+}
+
+func (n *Needle) String() (str string) {
+ str = fmt.Sprintf("Cookie:%d, Id:%d, Size:%d, DataSize:%d, Name: %s, Mime: %s", n.Cookie, n.Id, n.Size, n.DataSize, n.Name, n.Mime)
+ return
+}
+
+func ParseUpload(r *http.Request) (
+ fileName string, data []byte, mimeType string, isGzipped bool,
+ modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
+ form, fe := r.MultipartReader()
+ if fe != nil {
+ glog.V(0).Infoln("MultipartReader [ERROR]", fe)
+ e = fe
+ return
+ }
+
+ //first multi-part item
+ part, fe := form.NextPart()
+ if fe != nil {
+ glog.V(0).Infoln("Reading Multi part [ERROR]", fe)
+ e = fe
+ return
+ }
+
+ fileName = part.FileName()
+ if fileName != "" {
+ fileName = path.Base(fileName)
+ }
+
+ data, e = ioutil.ReadAll(part)
+ if e != nil {
+ glog.V(0).Infoln("Reading Content [ERROR]", e)
+ return
+ }
+
+ //if the filename is empty string, do a search on the other multi-part items
+ for fileName == "" {
+ part2, fe := form.NextPart()
+ if fe != nil {
+ break // no more or on error, just safely break
+ }
+
+ fName := part2.FileName()
+
+ //found the first <file type> multi-part has filename
+ if fName != "" {
+ data2, fe2 := ioutil.ReadAll(part2)
+ if fe2 != nil {
+ glog.V(0).Infoln("Reading Content [ERROR]", fe2)
+ e = fe2
+ return
+ }
+
+ //update
+ data = data2
+ fileName = path.Base(fName)
+ break
+ }
+ }
+
+ dotIndex := strings.LastIndex(fileName, ".")
+ ext, mtype := "", ""
+ if dotIndex > 0 {
+ ext = strings.ToLower(fileName[dotIndex:])
+ mtype = mime.TypeByExtension(ext)
+ }
+ contentType := part.Header.Get("Content-Type")
+ if contentType != "" && mtype != contentType {
+ mimeType = contentType //only return mime type if not deductable
+ mtype = contentType
+ }
+ if part.Header.Get("Content-Encoding") == "gzip" {
+ isGzipped = true
+ } else if operation.IsGzippable(ext, mtype) {
+ if data, e = operation.GzipData(data); e != nil {
+ return
+ }
+ isGzipped = true
+ }
+ if ext == ".gz" {
+ isGzipped = true
+ }
+ if strings.HasSuffix(fileName, ".gz") &&
+ !strings.HasSuffix(fileName, ".tar.gz") {
+ fileName = fileName[:len(fileName)-3]
+ }
+ modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ ttl, _ = ReadTTL(r.FormValue("ttl"))
+ isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
+ return
+}
+func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
+ fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
+ n = new(Needle)
+ fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
+ if e != nil {
+ return
+ }
+ if len(fname) < 256 {
+ n.Name = []byte(fname)
+ n.SetHasName()
+ }
+ if len(mimeType) < 256 {
+ n.Mime = []byte(mimeType)
+ n.SetHasMime()
+ }
+ if isGzipped {
+ n.SetGzipped()
+ }
+ if n.LastModified == 0 {
+ n.LastModified = uint64(time.Now().Unix())
+ }
+ n.SetHasLastModifiedDate()
+ if n.Ttl != EMPTY_TTL {
+ n.SetHasTtl()
+ }
+
+ if isChunkedFile {
+ n.SetIsChunkManifest()
+ }
+
+ if fixJpgOrientation {
+ loweredName := strings.ToLower(fname)
+ if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
+ n.Data = images.FixJpgOrientation(n.Data)
+ }
+ }
+
+ n.Checksum = NewCRC(n.Data)
+
+ commaSep := strings.LastIndex(r.URL.Path, ",")
+ dotSep := strings.LastIndex(r.URL.Path, ".")
+ fid := r.URL.Path[commaSep+1:]
+ if dotSep > 0 {
+ fid = r.URL.Path[commaSep+1 : dotSep]
+ }
+
+ e = n.ParsePath(fid)
+
+ return
+}
+func (n *Needle) ParsePath(fid string) (err error) {
+ length := len(fid)
+ if length <= 8 {
+ return fmt.Errorf("Invalid fid: %s", fid)
+ }
+ delta := ""
+ deltaIndex := strings.LastIndex(fid, "_")
+ if deltaIndex > 0 {
+ fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:]
+ }
+ n.Id, n.Cookie, err = ParseKeyHash(fid)
+ if err != nil {
+ return err
+ }
+ if delta != "" {
+ if d, e := strconv.ParseUint(delta, 10, 64); e == nil {
+ n.Id += d
+ } else {
+ return e
+ }
+ }
+ return err
+}
+
+func ParseKeyHash(key_hash_string string) (uint64, uint32, error) {
+ if len(key_hash_string) <= 8 {
+ return 0, 0, fmt.Errorf("KeyHash is too short.")
+ }
+ if len(key_hash_string) > 24 {
+ return 0, 0, fmt.Errorf("KeyHash is too long.")
+ }
+ split := len(key_hash_string) - 8
+ key, err := strconv.ParseUint(key_hash_string[:split], 16, 64)
+ if err != nil {
+ return 0, 0, fmt.Errorf("Parse key error: %v", err)
+ }
+ hash, err := strconv.ParseUint(key_hash_string[split:], 16, 32)
+ if err != nil {
+ return 0, 0, fmt.Errorf("Parse hash error: %v", err)
+ }
+ return key, uint32(hash), nil
+}
diff --git a/weed/storage/needle_byte_cache.go b/weed/storage/needle_byte_cache.go
new file mode 100644
index 000000000..ae35a48ba
--- /dev/null
+++ b/weed/storage/needle_byte_cache.go
@@ -0,0 +1,75 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/hashicorp/golang-lru"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ bytesCache *lru.Cache
+ bytesPool *util.BytesPool
+)
+
+/*
+There are one level of caching, and one level of pooling.
+
+In pooling, all []byte are fetched and returned to the pool bytesPool.
+
+In caching, the string~[]byte mapping is cached
+*/
+func init() {
+ bytesPool = util.NewBytesPool()
+ bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) {
+ value.(*Block).decreaseReference()
+ })
+}
+
+type Block struct {
+ Bytes []byte
+ refCount int32
+}
+
+func (block *Block) decreaseReference() {
+ if atomic.AddInt32(&block.refCount, -1) == 0 {
+ bytesPool.Put(block.Bytes)
+ }
+}
+func (block *Block) increaseReference() {
+ atomic.AddInt32(&block.refCount, 1)
+}
+
+// get bytes from the LRU cache of []byte first, then from the bytes pool
+// when []byte in LRU cache is evicted, it will be put back to the bytes pool
+func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) {
+ // check cache, return if found
+ cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize)
+ if obj, found := bytesCache.Get(cacheKey); found {
+ block = obj.(*Block)
+ block.increaseReference()
+ dataSlice = block.Bytes[0:readSize]
+ return dataSlice, block, nil
+ }
+
+ // get the []byte from pool
+ b := bytesPool.Get(readSize)
+ // refCount = 2, one by the bytesCache, one by the actual needle object
+ block = &Block{Bytes: b, refCount: 2}
+ dataSlice = block.Bytes[0:readSize]
+ _, err = r.ReadAt(dataSlice, offset)
+ bytesCache.Add(cacheKey, block)
+ return dataSlice, block, err
+}
+
+func (n *Needle) ReleaseMemory() {
+ if n.rawBlock != nil {
+ n.rawBlock.decreaseReference()
+ }
+}
+func ReleaseBytes(b []byte) {
+ bytesPool.Put(b)
+}
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
new file mode 100644
index 000000000..05bc6e86c
--- /dev/null
+++ b/weed/storage/needle_map.go
@@ -0,0 +1,123 @@
+package storage
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type NeedleMapType int
+
+const (
+ NeedleMapInMemory NeedleMapType = iota
+ NeedleMapLevelDb
+ NeedleMapBoltDb
+)
+
+type NeedleMapper interface {
+ Put(key uint64, offset uint32, size uint32) error
+ Get(key uint64) (element *NeedleValue, ok bool)
+ Delete(key uint64) error
+ Close()
+ Destroy() error
+ ContentSize() uint64
+ DeletedSize() uint64
+ FileCount() int
+ DeletedCount() int
+ MaxFileKey() uint64
+ IndexFileSize() uint64
+ IndexFileContent() ([]byte, error)
+ IndexFileName() string
+}
+
+type baseNeedleMapper struct {
+ indexFile *os.File
+ indexFileAccessLock sync.Mutex
+
+ mapMetric
+}
+
+func (nm *baseNeedleMapper) IndexFileSize() uint64 {
+ stat, err := nm.indexFile.Stat()
+ if err == nil {
+ return uint64(stat.Size())
+ }
+ return 0
+}
+
+func (nm *baseNeedleMapper) IndexFileName() string {
+ return nm.indexFile.Name()
+}
+
+func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) {
+ key = util.BytesToUint64(bytes[:8])
+ offset = util.BytesToUint32(bytes[8:12])
+ size = util.BytesToUint32(bytes[12:16])
+ return
+}
+func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error {
+ bytes := make([]byte, 16)
+ util.Uint64toBytes(bytes[0:8], key)
+ util.Uint32toBytes(bytes[8:12], offset)
+ util.Uint32toBytes(bytes[12:16], size)
+
+ nm.indexFileAccessLock.Lock()
+ defer nm.indexFileAccessLock.Unlock()
+ if _, err := nm.indexFile.Seek(0, 2); err != nil {
+ return fmt.Errorf("cannot seek end of indexfile %s: %v",
+ nm.indexFile.Name(), err)
+ }
+ _, err := nm.indexFile.Write(bytes)
+ return err
+}
+func (nm *baseNeedleMapper) IndexFileContent() ([]byte, error) {
+ nm.indexFileAccessLock.Lock()
+ defer nm.indexFileAccessLock.Unlock()
+ return ioutil.ReadFile(nm.indexFile.Name())
+}
+
+type mapMetric struct {
+ indexFile *os.File
+
+ DeletionCounter int `json:"DeletionCounter"`
+ FileCounter int `json:"FileCounter"`
+ DeletionByteCounter uint64 `json:"DeletionByteCounter"`
+ FileByteCounter uint64 `json:"FileByteCounter"`
+ MaximumFileKey uint64 `json:"MaxFileKey"`
+}
+
+func (mm *mapMetric) logDelete(deletedByteCount uint32) {
+ mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
+ mm.DeletionCounter++
+}
+
+func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
+ if key > mm.MaximumFileKey {
+ mm.MaximumFileKey = key
+ }
+ mm.FileCounter++
+ mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
+ if oldSize > 0 {
+ mm.DeletionCounter++
+ mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
+ }
+}
+
+func (mm mapMetric) ContentSize() uint64 {
+ return mm.FileByteCounter
+}
+func (mm mapMetric) DeletedSize() uint64 {
+ return mm.DeletionByteCounter
+}
+func (mm mapMetric) FileCount() int {
+ return mm.FileCounter
+}
+func (mm mapMetric) DeletedCount() int {
+ return mm.DeletionCounter
+}
+func (mm mapMetric) MaxFileKey() uint64 {
+ return mm.MaximumFileKey
+}
diff --git a/weed/storage/needle_map_boltdb.go b/weed/storage/needle_map_boltdb.go
new file mode 100644
index 000000000..bd3edf28d
--- /dev/null
+++ b/weed/storage/needle_map_boltdb.go
@@ -0,0 +1,165 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/boltdb/bolt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type BoltDbNeedleMap struct {
+ dbFileName string
+ db *bolt.DB
+ baseNeedleMapper
+}
+
+var boltdbBucket = []byte("weed")
+
+func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) {
+ m = &BoltDbNeedleMap{dbFileName: dbFileName}
+ m.indexFile = indexFile
+ if !isBoltDbFresh(dbFileName, indexFile) {
+ glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
+ generateBoltDbFile(dbFileName, indexFile)
+ glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", dbFileName)
+ if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil {
+ return
+ }
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ nm, indexLoadError := LoadNeedleMap(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = nm.mapMetric
+ return
+}
+
+func isBoltDbFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbLogFile, err := os.Open(dbFileName)
+ if err != nil {
+ return false
+ }
+ defer dbLogFile.Close()
+ dbStat, dbStatErr := dbLogFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
+ db, err := bolt.Open(dbFileName, 0644, nil)
+ if err != nil {
+ return err
+ }
+ defer db.Close()
+ return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
+ if offset > 0 {
+ boltDbWrite(db, key, offset, size)
+ } else {
+ boltDbDelete(db, key)
+ }
+ return nil
+ })
+}
+
+func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ bytes := make([]byte, 8)
+ var data []byte
+ util.Uint64toBytes(bytes, key)
+ err := m.db.View(func(tx *bolt.Tx) error {
+ bucket := tx.Bucket(boltdbBucket)
+ if bucket == nil {
+ return fmt.Errorf("Bucket %q not found!", boltdbBucket)
+ }
+
+ data = bucket.Get(bytes)
+ return nil
+ })
+
+ if err != nil || len(data) != 8 {
+ return nil, false
+ }
+ offset := util.BytesToUint32(data[0:4])
+ size := util.BytesToUint32(data[4:8])
+ return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+}
+
+func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ var oldSize uint32
+ if oldNeedle, ok := m.Get(key); ok {
+ oldSize = oldNeedle.Size
+ }
+ m.logPut(key, oldSize, size)
+ // write to index file first
+ if err := m.appendToIndexFile(key, offset, size); err != nil {
+ return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
+ }
+ return boltDbWrite(m.db, key, offset, size)
+}
+
+func boltDbWrite(db *bolt.DB,
+ key uint64, offset uint32, size uint32) error {
+ bytes := make([]byte, 16)
+ util.Uint64toBytes(bytes[0:8], key)
+ util.Uint32toBytes(bytes[8:12], offset)
+ util.Uint32toBytes(bytes[12:16], size)
+ return db.Update(func(tx *bolt.Tx) error {
+ bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
+ if err != nil {
+ return err
+ }
+
+ err = bucket.Put(bytes[0:8], bytes[8:16])
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+}
+func boltDbDelete(db *bolt.DB, key uint64) error {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ return db.Update(func(tx *bolt.Tx) error {
+ bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
+ if err != nil {
+ return err
+ }
+
+ err = bucket.Delete(bytes)
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+}
+
+func (m *BoltDbNeedleMap) Delete(key uint64) error {
+ if oldNeedle, ok := m.Get(key); ok {
+ m.logDelete(oldNeedle.Size)
+ }
+ // write to index file first
+ if err := m.appendToIndexFile(key, 0, 0); err != nil {
+ return err
+ }
+ return boltDbDelete(m.db, key)
+}
+
+func (m *BoltDbNeedleMap) Close() {
+ m.db.Close()
+}
+
+func (m *BoltDbNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.dbFileName)
+}
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
new file mode 100644
index 000000000..1789dbb12
--- /dev/null
+++ b/weed/storage/needle_map_leveldb.go
@@ -0,0 +1,134 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+type LevelDbNeedleMap struct {
+ dbFileName string
+ db *leveldb.DB
+ baseNeedleMapper
+}
+
+func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) {
+ m = &LevelDbNeedleMap{dbFileName: dbFileName}
+ m.indexFile = indexFile
+ if !isLevelDbFresh(dbFileName, indexFile) {
+ glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
+ generateLevelDbFile(dbFileName, indexFile)
+ glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", dbFileName)
+ if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil {
+ return
+ }
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ nm, indexLoadError := LoadNeedleMap(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = nm.mapMetric
+ return
+}
+
+func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
+ if err != nil {
+ return false
+ }
+ defer dbLogFile.Close()
+ dbStat, dbStatErr := dbLogFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
+ db, err := leveldb.OpenFile(dbFileName, nil)
+ if err != nil {
+ return err
+ }
+ defer db.Close()
+ return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
+ if offset > 0 {
+ levelDbWrite(db, key, offset, size)
+ } else {
+ levelDbDelete(db, key)
+ }
+ return nil
+ })
+}
+
+func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ data, err := m.db.Get(bytes, nil)
+ if err != nil || len(data) != 8 {
+ return nil, false
+ }
+ offset := util.BytesToUint32(data[0:4])
+ size := util.BytesToUint32(data[4:8])
+ return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
+}
+
+func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ var oldSize uint32
+ if oldNeedle, ok := m.Get(key); ok {
+ oldSize = oldNeedle.Size
+ }
+ m.logPut(key, oldSize, size)
+ // write to index file first
+ if err := m.appendToIndexFile(key, offset, size); err != nil {
+ return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
+ }
+ return levelDbWrite(m.db, key, offset, size)
+}
+
+func levelDbWrite(db *leveldb.DB,
+ key uint64, offset uint32, size uint32) error {
+ bytes := make([]byte, 16)
+ util.Uint64toBytes(bytes[0:8], key)
+ util.Uint32toBytes(bytes[8:12], offset)
+ util.Uint32toBytes(bytes[12:16], size)
+ if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil {
+ return fmt.Errorf("failed to write leveldb: %v", err)
+ }
+ return nil
+}
+func levelDbDelete(db *leveldb.DB, key uint64) error {
+ bytes := make([]byte, 8)
+ util.Uint64toBytes(bytes, key)
+ return db.Delete(bytes, nil)
+}
+
+func (m *LevelDbNeedleMap) Delete(key uint64) error {
+ if oldNeedle, ok := m.Get(key); ok {
+ m.logDelete(oldNeedle.Size)
+ }
+ // write to index file first
+ if err := m.appendToIndexFile(key, 0, 0); err != nil {
+ return err
+ }
+ return levelDbDelete(m.db, key)
+}
+
+func (m *LevelDbNeedleMap) Close() {
+ m.db.Close()
+}
+
+func (m *LevelDbNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.dbFileName)
+}
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
new file mode 100644
index 000000000..f2f4835df
--- /dev/null
+++ b/weed/storage/needle_map_memory.go
@@ -0,0 +1,106 @@
+package storage
+
+import (
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type NeedleMap struct {
+ m CompactMap
+
+ baseNeedleMapper
+}
+
+func NewNeedleMap(file *os.File) *NeedleMap {
+ nm := &NeedleMap{
+ m: NewCompactMap(),
+ }
+ nm.indexFile = file
+ return nm
+}
+
+const (
+ RowsToRead = 1024
+)
+
+func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
+ nm := NewNeedleMap(file)
+ e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
+ if key > nm.MaximumFileKey {
+ nm.MaximumFileKey = key
+ }
+ nm.FileCounter++
+ nm.FileByteCounter = nm.FileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
+ }
+ } else {
+ oldSize := nm.m.Delete(Key(key))
+ glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
+ }
+ return nil
+ })
+ glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
+ return nm, e
+}
+
+// walks through the index file, calls fn function with each key, offset, size
+// stops with the error returned by the fn function
+func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
+ var readerOffset int64
+ bytes := make([]byte, 16*RowsToRead)
+ count, e := r.ReadAt(bytes, readerOffset)
+ glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ readerOffset += int64(count)
+ var (
+ key uint64
+ offset, size uint32
+ i int
+ )
+
+ for count > 0 && e == nil || e == io.EOF {
+ for i = 0; i+16 <= count; i += 16 {
+ key, offset, size = idxFileEntry(bytes[i : i+16])
+ if e = fn(key, offset, size); e != nil {
+ return e
+ }
+ }
+ if e == io.EOF {
+ return nil
+ }
+ count, e = r.ReadAt(bytes, readerOffset)
+ glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ readerOffset += int64(count)
+ }
+ return e
+}
+
+func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ nm.logPut(key, oldSize, size)
+ return nm.appendToIndexFile(key, offset, size)
+}
+func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
+ element, ok = nm.m.Get(Key(key))
+ return
+}
+func (nm *NeedleMap) Delete(key uint64) error {
+ deletedBytes := nm.m.Delete(Key(key))
+ nm.logDelete(deletedBytes)
+ return nm.appendToIndexFile(key, 0, 0)
+}
+func (nm *NeedleMap) Close() {
+ _ = nm.indexFile.Close()
+}
+func (nm *NeedleMap) Destroy() error {
+ nm.Close()
+ return os.Remove(nm.indexFile.Name())
+}
diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go
new file mode 100644
index 000000000..2f26147d6
--- /dev/null
+++ b/weed/storage/needle_read_write.go
@@ -0,0 +1,291 @@
+package storage
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ FlagGzip = 0x01
+ FlagHasName = 0x02
+ FlagHasMime = 0x04
+ FlagHasLastModifiedDate = 0x08
+ FlagHasTtl = 0x10
+ FlagIsChunkManifest = 0x80
+ LastModifiedBytesLength = 5
+ TtlBytesLength = 2
+)
+
+func (n *Needle) DiskSize() int64 {
+ padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize)
+ return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize
+}
+func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
+ if s, ok := w.(io.Seeker); ok {
+ if end, e := s.Seek(0, 1); e == nil {
+ defer func(s io.Seeker, off int64) {
+ if err != nil {
+ if _, e = s.Seek(off, 0); e != nil {
+ glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
+ }
+ }
+ }(s, end)
+ } else {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
+ }
+ }
+ switch version {
+ case Version1:
+ header := make([]byte, NeedleHeaderSize)
+ util.Uint32toBytes(header[0:4], n.Cookie)
+ util.Uint64toBytes(header[4:12], n.Id)
+ n.Size = uint32(len(n.Data))
+ size = n.Size
+ util.Uint32toBytes(header[12:16], n.Size)
+ if _, err = w.Write(header); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Data); err != nil {
+ return
+ }
+ padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
+ util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
+ _, err = w.Write(header[0 : NeedleChecksumSize+padding])
+ return
+ case Version2:
+ header := make([]byte, NeedleHeaderSize)
+ util.Uint32toBytes(header[0:4], n.Cookie)
+ util.Uint64toBytes(header[4:12], n.Id)
+ n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
+ if n.DataSize > 0 {
+ n.Size = 4 + n.DataSize + 1
+ if n.HasName() {
+ n.Size = n.Size + 1 + uint32(n.NameSize)
+ }
+ if n.HasMime() {
+ n.Size = n.Size + 1 + uint32(n.MimeSize)
+ }
+ if n.HasLastModifiedDate() {
+ n.Size = n.Size + LastModifiedBytesLength
+ }
+ if n.HasTtl() {
+ n.Size = n.Size + TtlBytesLength
+ }
+ } else {
+ n.Size = 0
+ }
+ size = n.DataSize
+ util.Uint32toBytes(header[12:16], n.Size)
+ if _, err = w.Write(header); err != nil {
+ return
+ }
+ if n.DataSize > 0 {
+ util.Uint32toBytes(header[0:4], n.DataSize)
+ if _, err = w.Write(header[0:4]); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Data); err != nil {
+ return
+ }
+ util.Uint8toBytes(header[0:1], n.Flags)
+ if _, err = w.Write(header[0:1]); err != nil {
+ return
+ }
+ if n.HasName() {
+ util.Uint8toBytes(header[0:1], n.NameSize)
+ if _, err = w.Write(header[0:1]); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Name); err != nil {
+ return
+ }
+ }
+ if n.HasMime() {
+ util.Uint8toBytes(header[0:1], n.MimeSize)
+ if _, err = w.Write(header[0:1]); err != nil {
+ return
+ }
+ if _, err = w.Write(n.Mime); err != nil {
+ return
+ }
+ }
+ if n.HasLastModifiedDate() {
+ util.Uint64toBytes(header[0:8], n.LastModified)
+ if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
+ return
+ }
+ }
+ if n.HasTtl() && n.Ttl != nil {
+ n.Ttl.ToBytes(header[0:TtlBytesLength])
+ if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
+ return
+ }
+ }
+ }
+ padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
+ util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
+ _, err = w.Write(header[0 : NeedleChecksumSize+padding])
+ return n.DataSize, err
+ }
+ return 0, fmt.Errorf("Unsupported Version! (%d)", version)
+}
+
+func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
+ padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
+ readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
+ return getBytesForFileBlock(r, offset, int(readSize))
+}
+
+func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
+ bytes, block, err := ReadNeedleBlob(r, offset, size)
+ if err != nil {
+ return err
+ }
+ n.rawBlock = block
+ n.ParseNeedleHeader(bytes)
+ if n.Size != size {
+ return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
+ }
+ switch version {
+ case Version1:
+ n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
+ case Version2:
+ n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
+ }
+ checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
+ newChecksum := NewCRC(n.Data)
+ if checksum != newChecksum.Value() {
+ return errors.New("CRC error! Data On Disk Corrupted")
+ }
+ n.Checksum = newChecksum
+ return nil
+}
+func (n *Needle) ParseNeedleHeader(bytes []byte) {
+ n.Cookie = util.BytesToUint32(bytes[0:4])
+ n.Id = util.BytesToUint64(bytes[4:12])
+ n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
+}
+func (n *Needle) readNeedleDataVersion2(bytes []byte) {
+ index, lenBytes := 0, len(bytes)
+ if index < lenBytes {
+ n.DataSize = util.BytesToUint32(bytes[index : index+4])
+ index = index + 4
+ if int(n.DataSize)+index > lenBytes {
+ // this if clause is due to bug #87 and #93, fixed in v0.69
+ // remove this clause later
+ return
+ }
+ n.Data = bytes[index : index+int(n.DataSize)]
+ index = index + int(n.DataSize)
+ n.Flags = bytes[index]
+ index = index + 1
+ }
+ if index < lenBytes && n.HasName() {
+ n.NameSize = uint8(bytes[index])
+ index = index + 1
+ n.Name = bytes[index : index+int(n.NameSize)]
+ index = index + int(n.NameSize)
+ }
+ if index < lenBytes && n.HasMime() {
+ n.MimeSize = uint8(bytes[index])
+ index = index + 1
+ n.Mime = bytes[index : index+int(n.MimeSize)]
+ index = index + int(n.MimeSize)
+ }
+ if index < lenBytes && n.HasLastModifiedDate() {
+ n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
+ index = index + LastModifiedBytesLength
+ }
+ if index < lenBytes && n.HasTtl() {
+ n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
+ index = index + TtlBytesLength
+ }
+}
+
+func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
+ n = new(Needle)
+ if version == Version1 || version == Version2 {
+ bytes := make([]byte, NeedleHeaderSize)
+ var count int
+ count, err = r.ReadAt(bytes, offset)
+ if count <= 0 || err != nil {
+ return nil, 0, err
+ }
+ n.ParseNeedleHeader(bytes)
+ padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
+ bodyLength = n.Size + NeedleChecksumSize + padding
+ }
+ return
+}
+
+//n should be a needle already read the header
+//the input stream will read until next file entry
+func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) {
+ if bodyLength <= 0 {
+ return nil
+ }
+ switch version {
+ case Version1:
+ bytes := make([]byte, bodyLength)
+ if _, err = r.ReadAt(bytes, offset); err != nil {
+ return
+ }
+ n.Data = bytes[:n.Size]
+ n.Checksum = NewCRC(n.Data)
+ case Version2:
+ bytes := make([]byte, bodyLength)
+ if _, err = r.ReadAt(bytes, offset); err != nil {
+ return
+ }
+ n.readNeedleDataVersion2(bytes[0:n.Size])
+ n.Checksum = NewCRC(n.Data)
+ default:
+ err = fmt.Errorf("Unsupported Version! (%d)", version)
+ }
+ return
+}
+
+func (n *Needle) IsGzipped() bool {
+ return n.Flags&FlagGzip > 0
+}
+func (n *Needle) SetGzipped() {
+ n.Flags = n.Flags | FlagGzip
+}
+func (n *Needle) HasName() bool {
+ return n.Flags&FlagHasName > 0
+}
+func (n *Needle) SetHasName() {
+ n.Flags = n.Flags | FlagHasName
+}
+func (n *Needle) HasMime() bool {
+ return n.Flags&FlagHasMime > 0
+}
+func (n *Needle) SetHasMime() {
+ n.Flags = n.Flags | FlagHasMime
+}
+func (n *Needle) HasLastModifiedDate() bool {
+ return n.Flags&FlagHasLastModifiedDate > 0
+}
+func (n *Needle) SetHasLastModifiedDate() {
+ n.Flags = n.Flags | FlagHasLastModifiedDate
+}
+func (n *Needle) HasTtl() bool {
+ return n.Flags&FlagHasTtl > 0
+}
+func (n *Needle) SetHasTtl() {
+ n.Flags = n.Flags | FlagHasTtl
+}
+
+func (n *Needle) IsChunkedManifest() bool {
+ return n.Flags&FlagIsChunkManifest > 0
+}
+
+func (n *Needle) SetIsChunkManifest() {
+ n.Flags = n.Flags | FlagIsChunkManifest
+}
diff --git a/weed/storage/needle_test.go b/weed/storage/needle_test.go
new file mode 100644
index 000000000..c05afda2f
--- /dev/null
+++ b/weed/storage/needle_test.go
@@ -0,0 +1,45 @@
+package storage
+
+import "testing"
+
+func TestParseKeyHash(t *testing.T) {
+ testcases := []struct {
+ KeyHash string
+ ID uint64
+ Cookie uint32
+ Err bool
+ }{
+ // normal
+ {"4ed4c8116e41", 0x4ed4, 0xc8116e41, false},
+ // cookie with leading zeros
+ {"4ed401116e41", 0x4ed4, 0x01116e41, false},
+ // odd length
+ {"ed400116e41", 0xed4, 0x00116e41, false},
+ // uint
+ {"fed4c8114ed4c811f0116e41", 0xfed4c8114ed4c811, 0xf0116e41, false},
+ // err: too short
+ {"4ed4c811", 0, 0, true},
+ // err: too long
+ {"4ed4c8114ed4c8114ed4c8111", 0, 0, true},
+ // err: invalid character
+ {"helloworld", 0, 0, true},
+ }
+
+ for _, tc := range testcases {
+ if id, cookie, err := ParseKeyHash(tc.KeyHash); err != nil && !tc.Err {
+ t.Fatalf("Parse %s error: %v", tc.KeyHash, err)
+ } else if err == nil && tc.Err {
+ t.Fatalf("Parse %s expected error got nil", tc.KeyHash)
+ } else if id != tc.ID || cookie != tc.Cookie {
+ t.Fatalf("Parse %s wrong result. Expected: (%d, %d) got: (%d, %d)", tc.KeyHash, tc.ID, tc.Cookie, id, cookie)
+ }
+ }
+}
+
+func BenchmarkParseKeyHash(b *testing.B) {
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ ParseKeyHash("4ed44ed44ed44ed4c8116e41")
+ }
+}
diff --git a/weed/storage/replica_placement.go b/weed/storage/replica_placement.go
new file mode 100644
index 000000000..c1aca52eb
--- /dev/null
+++ b/weed/storage/replica_placement.go
@@ -0,0 +1,53 @@
+package storage
+
+import (
+ "errors"
+ "fmt"
+)
+
+type ReplicaPlacement struct {
+ SameRackCount int
+ DiffRackCount int
+ DiffDataCenterCount int
+}
+
+func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
+ rp := &ReplicaPlacement{}
+ for i, c := range t {
+ count := int(c - '0')
+ if 0 <= count && count <= 2 {
+ switch i {
+ case 0:
+ rp.DiffDataCenterCount = count
+ case 1:
+ rp.DiffRackCount = count
+ case 2:
+ rp.SameRackCount = count
+ }
+ } else {
+ return rp, errors.New("Unknown Replication Type:" + t)
+ }
+ }
+ return rp, nil
+}
+
+func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
+ return NewReplicaPlacementFromString(fmt.Sprintf("%03d", b))
+}
+
+func (rp *ReplicaPlacement) Byte() byte {
+ ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
+ return byte(ret)
+}
+
+func (rp *ReplicaPlacement) String() string {
+ b := make([]byte, 3)
+ b[0] = byte(rp.DiffDataCenterCount + '0')
+ b[1] = byte(rp.DiffRackCount + '0')
+ b[2] = byte(rp.SameRackCount + '0')
+ return string(b)
+}
+
+func (rp *ReplicaPlacement) GetCopyCount() int {
+ return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
+}
diff --git a/weed/storage/replica_placement_test.go b/weed/storage/replica_placement_test.go
new file mode 100644
index 000000000..9c2161e94
--- /dev/null
+++ b/weed/storage/replica_placement_test.go
@@ -0,0 +1,14 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestReplicaPlacemnetSerialDeserial(t *testing.T) {
+ rp, _ := NewReplicaPlacementFromString("001")
+ new_rp, _ := NewReplicaPlacementFromByte(rp.Byte())
+ if rp.String() != new_rp.String() {
+ println("expected:", rp.String(), "actual:", new_rp.String())
+ t.Fail()
+ }
+}
diff --git a/weed/storage/store.go b/weed/storage/store.go
new file mode 100644
index 000000000..d44d6a863
--- /dev/null
+++ b/weed/storage/store.go
@@ -0,0 +1,340 @@
+package storage
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/rand"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+)
+
+const (
+ MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
+)
+
+type MasterNodes struct {
+ nodes []string
+ lastNode int
+}
+
+func (mn *MasterNodes) String() string {
+ return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
+}
+
+func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
+ mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
+ return
+}
+func (mn *MasterNodes) reset() {
+ glog.V(4).Infof("Resetting master nodes: %v", mn)
+ if len(mn.nodes) > 1 && mn.lastNode >= 0 {
+ glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
+ mn.lastNode = -mn.lastNode - 1
+ }
+}
+func (mn *MasterNodes) findMaster() (string, error) {
+ if len(mn.nodes) == 0 {
+ return "", errors.New("No master node found!")
+ }
+ if mn.lastNode < 0 {
+ for _, m := range mn.nodes {
+ glog.V(4).Infof("Listing masters on %s", m)
+ if masters, e := operation.ListMasters(m); e == nil {
+ if len(masters) == 0 {
+ continue
+ }
+ mn.nodes = append(masters, m)
+ mn.lastNode = rand.Intn(len(mn.nodes))
+ glog.V(2).Infof("current master nodes is %v", mn)
+ break
+ } else {
+ glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
+ }
+ }
+ }
+ if mn.lastNode < 0 {
+ return "", errors.New("No master node available!")
+ }
+ return mn.nodes[mn.lastNode], nil
+}
+
+/*
+ * A VolumeServer contains one Store
+ */
+type Store struct {
+ Ip string
+ Port int
+ PublicUrl string
+ Locations []*DiskLocation
+ dataCenter string //optional informaton, overwriting master setting if exists
+ rack string //optional information, overwriting master setting if exists
+ connected bool
+ volumeSizeLimit uint64 //read from the master
+ masterNodes *MasterNodes
+}
+
+func (s *Store) String() (str string) {
+ str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
+ return
+}
+
+func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
+ s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
+ s.Locations = make([]*DiskLocation, 0)
+ for i := 0; i < len(dirnames); i++ {
+ location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
+ location.loadExistingVolumes(needleMapKind)
+ s.Locations = append(s.Locations, location)
+ }
+ return
+}
+func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
+ rt, e := NewReplicaPlacementFromString(replicaPlacement)
+ if e != nil {
+ return e
+ }
+ ttl, e := ReadTTL(ttlString)
+ if e != nil {
+ return e
+ }
+ for _, range_string := range strings.Split(volumeListString, ",") {
+ if strings.Index(range_string, "-") < 0 {
+ id_string := range_string
+ id, err := NewVolumeId(id_string)
+ if err != nil {
+ return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
+ }
+ e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
+ } else {
+ pair := strings.Split(range_string, "-")
+ start, start_err := strconv.ParseUint(pair[0], 10, 64)
+ if start_err != nil {
+ return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
+ }
+ end, end_err := strconv.ParseUint(pair[1], 10, 64)
+ if end_err != nil {
+ return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
+ }
+ for id := start; id <= end; id++ {
+ if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
+ e = err
+ }
+ }
+ }
+ }
+ return e
+}
+func (s *Store) DeleteCollection(collection string) (e error) {
+ for _, location := range s.Locations {
+ e = location.DeleteCollectionFromDiskLocation(collection)
+ if e != nil {
+ return
+ }
+ }
+ return
+}
+
+func (s *Store) findVolume(vid VolumeId) *Volume {
+ for _, location := range s.Locations {
+ if v, found := location.volumes[vid]; found {
+ return v
+ }
+ }
+ return nil
+}
+func (s *Store) findFreeLocation() (ret *DiskLocation) {
+ max := 0
+ for _, location := range s.Locations {
+ currentFreeCount := location.MaxVolumeCount - len(location.volumes)
+ if currentFreeCount > max {
+ max = currentFreeCount
+ ret = location
+ }
+ }
+ return ret
+}
+func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
+ if s.findVolume(vid) != nil {
+ return fmt.Errorf("Volume Id %d already exists!", vid)
+ }
+ if location := s.findFreeLocation(); location != nil {
+ glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
+ location.Directory, vid, collection, replicaPlacement, ttl)
+ if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
+ location.volumes[vid] = volume
+ return nil
+ } else {
+ return err
+ }
+ }
+ return fmt.Errorf("No more free space left")
+}
+
+func (s *Store) Status() []*VolumeInfo {
+ var stats []*VolumeInfo
+ for _, location := range s.Locations {
+ for k, v := range location.volumes {
+ s := &VolumeInfo{
+ Id: VolumeId(k),
+ Size: v.ContentSize(),
+ Collection: v.Collection,
+ ReplicaPlacement: v.ReplicaPlacement,
+ Version: v.Version(),
+ FileCount: v.nm.FileCount(),
+ DeleteCount: v.nm.DeletedCount(),
+ DeletedByteCount: v.nm.DeletedSize(),
+ ReadOnly: v.readOnly,
+ Ttl: v.Ttl}
+ stats = append(stats, s)
+ }
+ }
+ sortVolumeInfos(stats)
+ return stats
+}
+
+func (s *Store) SetDataCenter(dataCenter string) {
+ s.dataCenter = dataCenter
+}
+func (s *Store) SetRack(rack string) {
+ s.rack = rack
+}
+
+func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
+ s.masterNodes = NewMasterNodes(bootstrapMaster)
+}
+func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
+ masterNode, e = s.masterNodes.findMaster()
+ if e != nil {
+ return
+ }
+ var volumeMessages []*operation.VolumeInformationMessage
+ maxVolumeCount := 0
+ var maxFileKey uint64
+ for _, location := range s.Locations {
+ maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
+ for k, v := range location.volumes {
+ if maxFileKey < v.nm.MaxFileKey() {
+ maxFileKey = v.nm.MaxFileKey()
+ }
+ if !v.expired(s.volumeSizeLimit) {
+ volumeMessage := &operation.VolumeInformationMessage{
+ Id: proto.Uint32(uint32(k)),
+ Size: proto.Uint64(uint64(v.Size())),
+ Collection: proto.String(v.Collection),
+ FileCount: proto.Uint64(uint64(v.nm.FileCount())),
+ DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
+ DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
+ ReadOnly: proto.Bool(v.readOnly),
+ ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
+ Version: proto.Uint32(uint32(v.Version())),
+ Ttl: proto.Uint32(v.Ttl.ToUint32()),
+ }
+ volumeMessages = append(volumeMessages, volumeMessage)
+ } else {
+ if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
+ location.deleteVolumeById(v.Id)
+ glog.V(0).Infoln("volume", v.Id, "is deleted.")
+ } else {
+ glog.V(0).Infoln("volume", v.Id, "is expired.")
+ }
+ }
+ }
+ }
+
+ joinMessage := &operation.JoinMessage{
+ IsInit: proto.Bool(!s.connected),
+ Ip: proto.String(s.Ip),
+ Port: proto.Uint32(uint32(s.Port)),
+ PublicUrl: proto.String(s.PublicUrl),
+ MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
+ MaxFileKey: proto.Uint64(maxFileKey),
+ DataCenter: proto.String(s.dataCenter),
+ Rack: proto.String(s.rack),
+ Volumes: volumeMessages,
+ }
+
+ data, err := proto.Marshal(joinMessage)
+ if err != nil {
+ return "", "", err
+ }
+
+ joinUrl := "http://" + masterNode + "/dir/join"
+ glog.V(4).Infof("Connecting to %s ...", joinUrl)
+
+ jsonBlob, err := util.PostBytes(joinUrl, data)
+ if err != nil {
+ s.masterNodes.reset()
+ return "", "", err
+ }
+ var ret operation.JoinResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
+ s.masterNodes.reset()
+ return masterNode, "", err
+ }
+ if ret.Error != "" {
+ s.masterNodes.reset()
+ return masterNode, "", errors.New(ret.Error)
+ }
+ s.volumeSizeLimit = ret.VolumeSizeLimit
+ secretKey = security.Secret(ret.SecretKey)
+ s.connected = true
+ return
+}
+func (s *Store) Close() {
+ for _, location := range s.Locations {
+ for _, v := range location.volumes {
+ v.Close()
+ }
+ }
+}
+func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
+ if v := s.findVolume(i); v != nil {
+ if v.readOnly {
+ err = fmt.Errorf("Volume %d is read only", i)
+ return
+ }
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
+ size, err = v.write(n)
+ } else {
+ err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
+ }
+ if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
+ glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
+ if _, _, e := s.SendHeartbeatToMaster(); e != nil {
+ glog.V(0).Infoln("error when reporting size:", e)
+ }
+ }
+ return
+ }
+ glog.V(0).Infoln("volume", i, "not found!")
+ err = fmt.Errorf("Volume %d not found!", i)
+ return
+}
+func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
+ if v := s.findVolume(i); v != nil && !v.readOnly {
+ return v.delete(n)
+ }
+ return 0, nil
+}
+func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) {
+ if v := s.findVolume(i); v != nil {
+ return v.readNeedle(n)
+ }
+ return 0, fmt.Errorf("Volume %v not found!", i)
+}
+func (s *Store) GetVolume(i VolumeId) *Volume {
+ return s.findVolume(i)
+}
+
+func (s *Store) HasVolume(i VolumeId) bool {
+ v := s.findVolume(i)
+ return v != nil
+}
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go
new file mode 100644
index 000000000..03825c159
--- /dev/null
+++ b/weed/storage/store_vacuum.go
@@ -0,0 +1,44 @@
+package storage
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString), false
+ }
+ garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
+ if e != nil {
+ return fmt.Errorf("garbageThreshold %s is not a valid float number", garbageThresholdString), false
+ }
+ if v := s.findVolume(vid); v != nil {
+ glog.V(3).Infoln(vid, "garbage level is", v.garbageLevel())
+ return nil, garbageThreshold < v.garbageLevel()
+ }
+ return fmt.Errorf("volume id %d is not found during check compact", vid), false
+}
+func (s *Store) CompactVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
+ }
+ if v := s.findVolume(vid); v != nil {
+ return v.Compact()
+ }
+ return fmt.Errorf("volume id %d is not found during compact", vid)
+}
+func (s *Store) CommitCompactVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
+ }
+ if v := s.findVolume(vid); v != nil {
+ return v.commitCompact()
+ }
+ return fmt.Errorf("volume id %d is not found during commit compact", vid)
+}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
new file mode 100644
index 000000000..d40bdc565
--- /dev/null
+++ b/weed/storage/volume.go
@@ -0,0 +1,430 @@
+package storage
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type Volume struct {
+ Id VolumeId
+ dir string
+ Collection string
+ dataFile *os.File
+ nm NeedleMapper
+ needleMapKind NeedleMapType
+ readOnly bool
+
+ SuperBlock
+
+ dataFileAccessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
+}
+
+func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
+ v = &Volume{dir: dirname, Collection: collection, Id: id}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
+ v.needleMapKind = needleMapKind
+ e = v.load(true, true, needleMapKind)
+ return
+}
+func (v *Volume) String() string {
+ return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
+}
+
+func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
+ v = &Volume{dir: dirname, Collection: collection, Id: id}
+ v.SuperBlock = SuperBlock{}
+ v.needleMapKind = needleMapKind
+ e = v.load(false, false, needleMapKind)
+ return
+}
+func (v *Volume) FileName() (fileName string) {
+ if v.Collection == "" {
+ fileName = path.Join(v.dir, v.Id.String())
+ } else {
+ fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
+ }
+ return
+}
+func (v *Volume) DataFile() *os.File {
+ return v.dataFile
+}
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
+ var e error
+ fileName := v.FileName()
+
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
+ if !canRead {
+ return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
+ }
+ if canWrite {
+ v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
+ } else {
+ glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
+ v.dataFile, e = os.Open(fileName + ".dat")
+ v.readOnly = true
+ }
+ } else {
+ if createDatIfMissing {
+ v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ } else {
+ return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
+ }
+ }
+
+ if e != nil {
+ if !os.IsPermission(e) {
+ return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
+ }
+ }
+
+ if v.ReplicaPlacement == nil {
+ e = v.readSuperBlock()
+ } else {
+ e = v.maybeWriteSuperBlock()
+ }
+ if e == nil && alsoLoadIndex {
+ var indexFile *os.File
+ if v.readOnly {
+ glog.V(1).Infoln("open to read file", fileName+".idx")
+ if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
+ return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
+ }
+ } else {
+ glog.V(1).Infoln("open to write file", fileName+".idx")
+ if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
+ return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
+ }
+ }
+ switch needleMapKind {
+ case NeedleMapInMemory:
+ glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
+ if v.nm, e = LoadNeedleMap(indexFile); e != nil {
+ glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
+ }
+ case NeedleMapLevelDb:
+ glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
+ if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ }
+ case NeedleMapBoltDb:
+ glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
+ if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
+ glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
+ }
+ }
+ }
+ return e
+}
+func (v *Volume) Version() Version {
+ return v.SuperBlock.Version()
+}
+func (v *Volume) Size() int64 {
+ stat, e := v.dataFile.Stat()
+ if e == nil {
+ return stat.Size()
+ }
+ glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
+ return -1
+}
+
+// Close cleanly shuts down this volume
+func (v *Volume) Close() {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ v.nm.Close()
+ _ = v.dataFile.Close()
+}
+
+func (v *Volume) NeedToReplicate() bool {
+ return v.ReplicaPlacement.GetCopyCount() > 1
+}
+
+// isFileUnchanged checks whether this needle to write is same as last one.
+// It requires serialized access in the same volume.
+func (v *Volume) isFileUnchanged(n *Needle) bool {
+ if v.Ttl.String() != "" {
+ return false
+ }
+ nv, ok := v.nm.Get(n.Id)
+ if ok && nv.Offset > 0 {
+ oldNeedle := new(Needle)
+ err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ glog.V(0).Infof("Failed to check updated file %v", err)
+ return false
+ }
+ defer oldNeedle.ReleaseMemory()
+ if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
+ n.DataSize = oldNeedle.DataSize
+ return true
+ }
+ }
+ return false
+}
+
+// Destroy removes everything related to this volume
+func (v *Volume) Destroy() (err error) {
+ if v.readOnly {
+ err = fmt.Errorf("%s is read-only", v.dataFile.Name())
+ return
+ }
+ v.Close()
+ err = os.Remove(v.dataFile.Name())
+ if err != nil {
+ return
+ }
+ err = v.nm.Destroy()
+ return
+}
+
+// AppendBlob append a blob to end of the data file, used in replication
+func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
+ if v.readOnly {
+ err = fmt.Errorf("%s is read-only", v.dataFile.Name())
+ return
+ }
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ if offset, err = v.dataFile.Seek(0, 2); err != nil {
+ glog.V(0).Infof("failed to seek the end of file: %v", err)
+ return
+ }
+ //ensure file writing starting from aligned positions
+ if offset%NeedlePaddingSize != 0 {
+ offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
+ if offset, err = v.dataFile.Seek(offset, 0); err != nil {
+ glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
+ return
+ }
+ }
+ v.dataFile.Write(b)
+ return
+}
+
+func (v *Volume) write(n *Needle) (size uint32, err error) {
+ glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
+ if v.readOnly {
+ err = fmt.Errorf("%s is read-only", v.dataFile.Name())
+ return
+ }
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ if v.isFileUnchanged(n) {
+ size = n.DataSize
+ glog.V(4).Infof("needle is unchanged!")
+ return
+ }
+ var offset int64
+ if offset, err = v.dataFile.Seek(0, 2); err != nil {
+ glog.V(0).Infof("failed to seek the end of file: %v", err)
+ return
+ }
+
+ //ensure file writing starting from aligned positions
+ if offset%NeedlePaddingSize != 0 {
+ offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
+ if offset, err = v.dataFile.Seek(offset, 0); err != nil {
+ glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
+ return
+ }
+ }
+
+ if size, err = n.Append(v.dataFile, v.Version()); err != nil {
+ if e := v.dataFile.Truncate(offset); e != nil {
+ err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
+ }
+ return
+ }
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
+ if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
+ }
+ }
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
+ return
+}
+
+func (v *Volume) delete(n *Needle) (uint32, error) {
+ glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
+ if v.readOnly {
+ return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
+ }
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ nv, ok := v.nm.Get(n.Id)
+ //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
+ if ok {
+ size := nv.Size
+ if err := v.nm.Delete(n.Id); err != nil {
+ return size, err
+ }
+ if _, err := v.dataFile.Seek(0, 2); err != nil {
+ return size, err
+ }
+ n.Data = nil
+ _, err := n.Append(v.dataFile, v.Version())
+ return size, err
+ }
+ return 0, nil
+}
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) readNeedle(n *Needle) (int, error) {
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return 0, err
+ }
+ bytesRead := len(n.Data)
+ if !n.HasTtl() {
+ return bytesRead, nil
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
+ }
+ n.ReleaseMemory()
+ return -1, errors.New("Not Found")
+}
+
+func ScanVolumeFile(dirname string, collection string, id VolumeId,
+ needleMapKind NeedleMapType,
+ visitSuperBlock func(SuperBlock) error,
+ readNeedleBody bool,
+ visitNeedle func(n *Needle, offset int64) error) (err error) {
+ var v *Volume
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
+ return fmt.Errorf("Failed to load volume %d: %v", id, err)
+ }
+ if err = visitSuperBlock(v.SuperBlock); err != nil {
+ return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
+ }
+
+ version := v.Version()
+
+ offset := int64(SuperBlockSize)
+ n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
+ if e != nil {
+ err = fmt.Errorf("cannot read needle header: %v", e)
+ return
+ }
+ for n != nil {
+ if readNeedleBody {
+ if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
+ glog.V(0).Infof("cannot read needle body: %v", err)
+ //err = fmt.Errorf("cannot read needle body: %v", err)
+ //return
+ }
+ if n.DataSize >= n.Size {
+ // this should come from a bug reported on #87 and #93
+ // fixed in v0.69
+ // remove this whole "if" clause later, long after 0.69
+ oldRest, oldSize := rest, n.Size
+ padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
+ n.Size = 0
+ rest = n.Size + NeedleChecksumSize + padding
+ if rest%NeedlePaddingSize != 0 {
+ rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
+ }
+ glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
+ }
+ }
+ if err = visitNeedle(n, offset); err != nil {
+ glog.V(0).Infof("visit needle error: %v", err)
+ }
+ offset += int64(NeedleHeaderSize) + int64(rest)
+ glog.V(4).Infof("==> new entry offset %d", offset)
+ if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read needle header: %v", err)
+ }
+ glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
+ }
+
+ return
+}
+
+func (v *Volume) ContentSize() uint64 {
+ return v.nm.ContentSize()
+}
+
+func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
+ exists = true
+ fi, err := os.Stat(filename)
+ if os.IsNotExist(err) {
+ exists = false
+ return
+ }
+ if fi.Mode()&0400 != 0 {
+ canRead = true
+ }
+ if fi.Mode()&0200 != 0 {
+ canWrite = true
+ }
+ modTime = fi.ModTime()
+ return
+}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}
diff --git a/weed/storage/volume_id.go b/weed/storage/volume_id.go
new file mode 100644
index 000000000..0333c6cf0
--- /dev/null
+++ b/weed/storage/volume_id.go
@@ -0,0 +1,18 @@
+package storage
+
+import (
+ "strconv"
+)
+
+type VolumeId uint32
+
+func NewVolumeId(vid string) (VolumeId, error) {
+ volumeId, err := strconv.ParseUint(vid, 10, 64)
+ return VolumeId(volumeId), err
+}
+func (vid *VolumeId) String() string {
+ return strconv.FormatUint(uint64(*vid), 10)
+}
+func (vid *VolumeId) Next() VolumeId {
+ return VolumeId(uint32(*vid) + 1)
+}
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
new file mode 100644
index 000000000..b3068eec3
--- /dev/null
+++ b/weed/storage/volume_info.go
@@ -0,0 +1,65 @@
+package storage
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "sort"
+)
+
+type VolumeInfo struct {
+ Id VolumeId
+ Size uint64
+ ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
+ Collection string
+ Version Version
+ FileCount int
+ DeleteCount int
+ DeletedByteCount uint64
+ ReadOnly bool
+}
+
+func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) {
+ vi = VolumeInfo{
+ Id: VolumeId(*m.Id),
+ Size: *m.Size,
+ Collection: *m.Collection,
+ FileCount: int(*m.FileCount),
+ DeleteCount: int(*m.DeleteCount),
+ DeletedByteCount: *m.DeletedByteCount,
+ ReadOnly: *m.ReadOnly,
+ Version: Version(*m.Version),
+ }
+ rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
+ if e != nil {
+ return vi, e
+ }
+ vi.ReplicaPlacement = rp
+ vi.Ttl = LoadTTLFromUint32(*m.Ttl)
+ return vi, nil
+}
+
+func (vi VolumeInfo) String() string {
+ return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
+ vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
+}
+
+/*VolumesInfo sorting*/
+
+type volumeInfos []*VolumeInfo
+
+func (vis volumeInfos) Len() int {
+ return len(vis)
+}
+
+func (vis volumeInfos) Less(i, j int) bool {
+ return vis[i].Id < vis[j].Id
+}
+
+func (vis volumeInfos) Swap(i, j int) {
+ vis[i], vis[j] = vis[j], vis[i]
+}
+
+func sortVolumeInfos(vis volumeInfos) {
+ sort.Sort(vis)
+}
diff --git a/weed/storage/volume_info_test.go b/weed/storage/volume_info_test.go
new file mode 100644
index 000000000..9a9c43ad2
--- /dev/null
+++ b/weed/storage/volume_info_test.go
@@ -0,0 +1,23 @@
+package storage
+
+import "testing"
+
+func TestSortVolumeInfos(t *testing.T) {
+ vis := []*VolumeInfo{
+ &VolumeInfo{
+ Id: 2,
+ },
+ &VolumeInfo{
+ Id: 1,
+ },
+ &VolumeInfo{
+ Id: 3,
+ },
+ }
+ sortVolumeInfos(vis)
+ for i := 0; i < len(vis); i++ {
+ if vis[i].Id != VolumeId(i+1) {
+ t.Fatal()
+ }
+ }
+}
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
new file mode 100644
index 000000000..fc773273d
--- /dev/null
+++ b/weed/storage/volume_super_block.go
@@ -0,0 +1,81 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ SuperBlockSize = 8
+)
+
+/*
+* Super block currently has 8 bytes allocated for each volume.
+* Byte 0: version, 1 or 2
+* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Byte 4 and byte 5: The number of times the volume has been compacted.
+* Rest bytes: Reserved
+ */
+type SuperBlock struct {
+ version Version
+ ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
+ CompactRevision uint16
+}
+
+func (s *SuperBlock) Version() Version {
+ return s.version
+}
+func (s *SuperBlock) Bytes() []byte {
+ header := make([]byte, SuperBlockSize)
+ header[0] = byte(s.version)
+ header[1] = s.ReplicaPlacement.Byte()
+ s.Ttl.ToBytes(header[2:4])
+ util.Uint16toBytes(header[4:6], s.CompactRevision)
+ return header
+}
+
+func (v *Volume) maybeWriteSuperBlock() error {
+ stat, e := v.dataFile.Stat()
+ if e != nil {
+ glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile, e)
+ return e
+ }
+ if stat.Size() == 0 {
+ v.SuperBlock.version = CurrentVersion
+ _, e = v.dataFile.Write(v.SuperBlock.Bytes())
+ if e != nil && os.IsPermission(e) {
+ //read-only, but zero length - recreate it!
+ if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
+ if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
+ v.readOnly = false
+ }
+ }
+ }
+ }
+ return e
+}
+func (v *Volume) readSuperBlock() (err error) {
+ if _, err = v.dataFile.Seek(0, 0); err != nil {
+ return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err)
+ }
+ header := make([]byte, SuperBlockSize)
+ if _, e := v.dataFile.Read(header); e != nil {
+ return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e)
+ }
+ v.SuperBlock, err = ParseSuperBlock(header)
+ return err
+}
+func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
+ superBlock.version = Version(header[0])
+ if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
+ err = fmt.Errorf("cannot read replica type: %s", err.Error())
+ }
+ superBlock.Ttl = LoadTTLFromBytes(header[2:4])
+ superBlock.CompactRevision = util.BytesToUint16(header[4:6])
+ return
+}
diff --git a/weed/storage/volume_super_block_test.go b/weed/storage/volume_super_block_test.go
new file mode 100644
index 000000000..13db4b194
--- /dev/null
+++ b/weed/storage/volume_super_block_test.go
@@ -0,0 +1,23 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestSuperBlockReadWrite(t *testing.T) {
+ rp, _ := NewReplicaPlacementFromByte(byte(001))
+ ttl, _ := ReadTTL("15d")
+ s := &SuperBlock{
+ version: CurrentVersion,
+ ReplicaPlacement: rp,
+ Ttl: ttl,
+ }
+
+ bytes := s.Bytes()
+
+ if !(bytes[2] == 15 && bytes[3] == Day) {
+ println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
+ t.Fail()
+ }
+
+}
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go
new file mode 100644
index 000000000..231ff31c2
--- /dev/null
+++ b/weed/storage/volume_sync.go
@@ -0,0 +1,213 @@
+package storage
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/url"
+ "os"
+ "sort"
+ "strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// The volume sync with a master volume via 2 steps:
+// 1. The slave checks master side to find subscription checkpoint
+// to setup the replication.
+// 2. The slave receives the updates from master
+
+/*
+Assume the slave volume needs to follow the master volume.
+
+The master volume could be compacted, and could be many files ahead of
+slave volume.
+
+Step 1:
+The slave volume will ask the master volume for a snapshot
+of (existing file entries, last offset, number of compacted times).
+
+For each entry x in master existing file entries:
+ if x does not exist locally:
+ add x locally
+
+For each entry y in local slave existing file entries:
+ if y does not exist on master:
+ delete y locally
+
+Step 2:
+After this, use the last offset and number of compacted times to request
+the master volume to send a new file, and keep looping. If the number of
+compacted times is changed, go back to step 1 (very likely this can be
+optimized more later).
+
+*/
+
+func (v *Volume) Synchronize(volumeServer string) (err error) {
+ var lastCompactRevision uint16 = 0
+ var compactRevision uint16 = 0
+ var masterMap CompactMap
+ for i := 0; i < 3; i++ {
+ if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
+ return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
+ }
+ if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
+ if err = v.Compact(); err != nil {
+ return fmt.Errorf("Compact Volume before synchronizing %v", err)
+ }
+ if err = v.commitCompact(); err != nil {
+ return fmt.Errorf("Commit Compact before synchronizing %v", err)
+ }
+ }
+ lastCompactRevision = compactRevision
+ if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
+ return
+ }
+ }
+ return
+}
+
+type ByOffset []NeedleValue
+
+func (a ByOffset) Len() int { return len(a) }
+func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
+
+// trySynchronizing sync with remote volume server incrementally by
+// make up the local and remote delta.
+func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error {
+ slaveIdxFile, err := os.Open(v.nm.IndexFileName())
+ if err != nil {
+ return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
+ }
+ defer slaveIdxFile.Close()
+ slaveMap, err := LoadNeedleMap(slaveIdxFile)
+ if err != nil {
+ return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
+ }
+ var delta []NeedleValue
+ if err := masterMap.Visit(func(needleValue NeedleValue) error {
+ if needleValue.Key == 0 {
+ return nil
+ }
+ if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok {
+ return nil // skip intersection
+ }
+ delta = append(delta, needleValue)
+ return nil
+ }); err != nil {
+ return fmt.Errorf("Add master entry: %v", err)
+ }
+ if err := slaveMap.m.Visit(func(needleValue NeedleValue) error {
+ if needleValue.Key == 0 {
+ return nil
+ }
+ if _, ok := masterMap.Get(needleValue.Key); ok {
+ return nil // skip intersection
+ }
+ needleValue.Size = 0
+ delta = append(delta, needleValue)
+ return nil
+ }); err != nil {
+ return fmt.Errorf("Remove local entry: %v", err)
+ }
+
+ // simulate to same ordering of remote .dat file needle entries
+ sort.Sort(ByOffset(delta))
+
+ // make up the delta
+ fetchCount := 0
+ volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data"
+ for _, needleValue := range delta {
+ if needleValue.Size == 0 {
+ // remove file entry from local
+ v.removeNeedle(needleValue.Key)
+ continue
+ }
+ // add master file entry to local data file
+ if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil {
+ glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
+ return err
+ }
+ fetchCount++
+ }
+ glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
+ return nil
+}
+
+func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) {
+ m = NewCompactMap()
+
+ syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
+ if err != nil {
+ return m, 0, 0, err
+ }
+
+ total := 0
+ err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
+ // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
+ if offset != 0 && size != 0 {
+ m.Set(Key(key), offset, size)
+ } else {
+ m.Delete(Key(key))
+ }
+ total++
+ })
+
+ glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
+ return m, syncStatus.TailOffset, syncStatus.CompactRevision, err
+
+}
+
+func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse {
+ var syncStatus = operation.SyncVolumeResponse{}
+ if stat, err := v.dataFile.Stat(); err == nil {
+ syncStatus.TailOffset = uint64(stat.Size())
+ }
+ syncStatus.IdxFileSize = v.nm.IndexFileSize()
+ syncStatus.CompactRevision = v.SuperBlock.CompactRevision
+ syncStatus.Ttl = v.SuperBlock.Ttl.String()
+ syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
+ return syncStatus
+}
+
+func (v *Volume) IndexFileContent() ([]byte, error) {
+ return v.nm.IndexFileContent()
+}
+
+// removeNeedle removes one needle by needle key
+func (v *Volume) removeNeedle(key Key) {
+ n := new(Needle)
+ n.Id = uint64(key)
+ v.delete(n)
+}
+
+// fetchNeedle fetches a remote volume needle by vid, id, offset
+// The compact revision is checked first in case the remote volume
+// is compacted and the offset is invalid any more.
+func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
+ needleValue NeedleValue, compactRevision uint16) error {
+ // add master file entry to local data file
+ values := make(url.Values)
+ values.Add("revision", strconv.Itoa(int(compactRevision)))
+ values.Add("volume", v.Id.String())
+ values.Add("id", needleValue.Key.String())
+ values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10))
+ values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10))
+ glog.V(4).Infof("Fetch %+v", needleValue)
+ return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
+ }
+ offset, err := v.AppendBlob(b)
+ if err != nil {
+ return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
+ }
+ // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
+ v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size)
+ return nil
+ })
+}
diff --git a/weed/storage/volume_ttl.go b/weed/storage/volume_ttl.go
new file mode 100644
index 000000000..4318bb048
--- /dev/null
+++ b/weed/storage/volume_ttl.go
@@ -0,0 +1,135 @@
+package storage
+
+import (
+ "strconv"
+)
+
+const (
+ //stored unit types
+ Empty byte = iota
+ Minute
+ Hour
+ Day
+ Week
+ Month
+ Year
+)
+
+type TTL struct {
+ count byte
+ unit byte
+}
+
+var EMPTY_TTL = &TTL{}
+
+// translate a readable ttl to internal ttl
+// Supports format example:
+// 3m: 3 minutes
+// 4h: 4 hours
+// 5d: 5 days
+// 6w: 6 weeks
+// 7M: 7 months
+// 8y: 8 years
+func ReadTTL(ttlString string) (*TTL, error) {
+ if ttlString == "" {
+ return EMPTY_TTL, nil
+ }
+ ttlBytes := []byte(ttlString)
+ unitByte := ttlBytes[len(ttlBytes)-1]
+ countBytes := ttlBytes[0 : len(ttlBytes)-1]
+ if '0' <= unitByte && unitByte <= '9' {
+ countBytes = ttlBytes
+ unitByte = 'm'
+ }
+ count, err := strconv.Atoi(string(countBytes))
+ unit := toStoredByte(unitByte)
+ return &TTL{count: byte(count), unit: unit}, err
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromBytes(input []byte) (t *TTL) {
+ return &TTL{count: input[0], unit: input[1]}
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromUint32(ttl uint32) (t *TTL) {
+ input := make([]byte, 2)
+ input[1] = byte(ttl)
+ input[0] = byte(ttl >> 8)
+ return LoadTTLFromBytes(input)
+}
+
+// save stored bytes to an output with 2 bytes
+func (t *TTL) ToBytes(output []byte) {
+ output[0] = t.count
+ output[1] = t.unit
+}
+
+func (t *TTL) ToUint32() (output uint32) {
+ output = uint32(t.count) << 8
+ output += uint32(t.unit)
+ return output
+}
+
+func (t *TTL) String() string {
+ if t == nil || t.count == 0 {
+ return ""
+ }
+ if t.unit == Empty {
+ return ""
+ }
+ countString := strconv.Itoa(int(t.count))
+ switch t.unit {
+ case Minute:
+ return countString + "m"
+ case Hour:
+ return countString + "h"
+ case Day:
+ return countString + "d"
+ case Week:
+ return countString + "w"
+ case Month:
+ return countString + "M"
+ case Year:
+ return countString + "y"
+ }
+ return ""
+}
+
+func toStoredByte(readableUnitByte byte) byte {
+ switch readableUnitByte {
+ case 'm':
+ return Minute
+ case 'h':
+ return Hour
+ case 'd':
+ return Day
+ case 'w':
+ return Week
+ case 'M':
+ return Month
+ case 'y':
+ return Year
+ }
+ return 0
+}
+
+func (t TTL) Minutes() uint32 {
+ switch t.unit {
+ case Empty:
+ return 0
+ case Minute:
+ return uint32(t.count)
+ case Hour:
+ return uint32(t.count) * 60
+ case Day:
+ return uint32(t.count) * 60 * 24
+ case Week:
+ return uint32(t.count) * 60 * 24 * 7
+ case Month:
+ return uint32(t.count) * 60 * 24 * 31
+ case Year:
+ return uint32(t.count) * 60 * 24 * 365
+ }
+ return 0
+}
diff --git a/weed/storage/volume_ttl_test.go b/weed/storage/volume_ttl_test.go
new file mode 100644
index 000000000..216469a4c
--- /dev/null
+++ b/weed/storage/volume_ttl_test.go
@@ -0,0 +1,60 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestTTLReadWrite(t *testing.T) {
+ ttl, _ := ReadTTL("")
+ if ttl.Minutes() != 0 {
+ t.Errorf("empty ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("9")
+ if ttl.Minutes() != 9 {
+ t.Errorf("9 ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("8m")
+ if ttl.Minutes() != 8 {
+ t.Errorf("8m ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5h")
+ if ttl.Minutes() != 300 {
+ t.Errorf("5h ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5d")
+ if ttl.Minutes() != 5*24*60 {
+ t.Errorf("5d ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5w")
+ if ttl.Minutes() != 5*7*24*60 {
+ t.Errorf("5w ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5M")
+ if ttl.Minutes() != 5*31*24*60 {
+ t.Errorf("5M ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5y")
+ if ttl.Minutes() != 5*365*24*60 {
+ t.Errorf("5y ttl:%v", ttl)
+ }
+
+ output := make([]byte, 2)
+ ttl.ToBytes(output)
+ ttl2 := LoadTTLFromBytes(output)
+ if ttl.Minutes() != ttl2.Minutes() {
+ t.Errorf("ttl:%v ttl2:%v", ttl, ttl2)
+ }
+
+ ttl3 := LoadTTLFromUint32(ttl.ToUint32())
+ if ttl.Minutes() != ttl3.Minutes() {
+ t.Errorf("ttl:%v ttl3:%v", ttl, ttl3)
+ }
+
+}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
new file mode 100644
index 000000000..9b9a27816
--- /dev/null
+++ b/weed/storage/volume_vacuum.go
@@ -0,0 +1,93 @@
+package storage
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func (v *Volume) garbageLevel() float64 {
+ return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
+}
+
+func (v *Volume) Compact() error {
+ glog.V(3).Infof("Compacting ...")
+ //no need to lock for copy on write
+ //v.accessLock.Lock()
+ //defer v.accessLock.Unlock()
+ //glog.V(3).Infof("Got Compaction lock...")
+
+ filePath := v.FileName()
+ glog.V(3).Infof("creating copies for volume %d ...", v.Id)
+ return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
+}
+func (v *Volume) commitCompact() error {
+ glog.V(3).Infof("Committing vacuuming...")
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ glog.V(3).Infof("Got Committing lock...")
+ v.nm.Close()
+ _ = v.dataFile.Close()
+ var e error
+ if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
+ return e
+ }
+ if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
+ return e
+ }
+ //glog.V(3).Infof("Pretending to be vacuuming...")
+ //time.Sleep(20 * time.Second)
+ glog.V(3).Infof("Loading Commit file...")
+ if e = v.load(true, false, v.needleMapKind); e != nil {
+ return e
+ }
+ return nil
+}
+
+func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
+ var (
+ dst, idx *os.File
+ )
+ if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
+ return
+ }
+ defer dst.Close()
+
+ if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
+ return
+ }
+ defer idx.Close()
+
+ nm := NewNeedleMap(idx)
+ new_offset := int64(SuperBlockSize)
+
+ now := uint64(time.Now().Unix())
+
+ err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind,
+ func(superBlock SuperBlock) error {
+ superBlock.CompactRevision++
+ _, err = dst.Write(superBlock.Bytes())
+ return err
+ }, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
+ }
+ nv, ok := v.nm.Get(n.Id)
+ glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
+ if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
+ if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
+ return fmt.Errorf("cannot put needle: %s", err)
+ }
+ if _, err = n.Append(dst, v.Version()); err != nil {
+ return fmt.Errorf("cannot append needle: %s", err)
+ }
+ new_offset += n.DiskSize()
+ glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
+ }
+ return nil
+ })
+
+ return
+}
diff --git a/weed/storage/volume_version.go b/weed/storage/volume_version.go
new file mode 100644
index 000000000..2e9f58aa2
--- /dev/null
+++ b/weed/storage/volume_version.go
@@ -0,0 +1,9 @@
+package storage
+
+type Version uint8
+
+const (
+ Version1 = Version(1)
+ Version2 = Version(2)
+ CurrentVersion = Version2
+)