aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/compact_map.go207
-rw-r--r--go/storage/compact_map_perf_test.go45
-rw-r--r--go/storage/compact_map_test.go77
-rw-r--r--go/storage/crc.go30
-rw-r--r--go/storage/disk_location.go73
-rw-r--r--go/storage/file_id.go43
-rw-r--r--go/storage/needle.go231
-rw-r--r--go/storage/needle_byte_cache.go75
-rw-r--r--go/storage/needle_map.go123
-rw-r--r--go/storage/needle_map_boltdb.go165
-rw-r--r--go/storage/needle_map_leveldb.go134
-rw-r--r--go/storage/needle_map_memory.go106
-rw-r--r--go/storage/needle_read_write.go291
-rw-r--r--go/storage/needle_test.go45
-rw-r--r--go/storage/replica_placement.go53
-rw-r--r--go/storage/replica_placement_test.go14
-rw-r--r--go/storage/store.go340
-rw-r--r--go/storage/store_vacuum.go44
-rw-r--r--go/storage/volume.go430
-rw-r--r--go/storage/volume_id.go18
-rw-r--r--go/storage/volume_info.go65
-rw-r--r--go/storage/volume_info_test.go23
-rw-r--r--go/storage/volume_super_block.go81
-rw-r--r--go/storage/volume_super_block_test.go23
-rw-r--r--go/storage/volume_sync.go213
-rw-r--r--go/storage/volume_ttl.go135
-rw-r--r--go/storage/volume_ttl_test.go60
-rw-r--r--go/storage/volume_vacuum.go93
-rw-r--r--go/storage/volume_version.go9
29 files changed, 0 insertions, 3246 deletions
diff --git a/go/storage/compact_map.go b/go/storage/compact_map.go
deleted file mode 100644
index d4438d044..000000000
--- a/go/storage/compact_map.go
+++ /dev/null
@@ -1,207 +0,0 @@
-package storage
-
-import (
- "strconv"
- "sync"
-)
-
-type NeedleValue struct {
- Key Key
- Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
- Size uint32 `comment:"Size of the data portion"`
-}
-
-const (
- batch = 100000
-)
-
-type Key uint64
-
-func (k Key) String() string {
- return strconv.FormatUint(uint64(k), 10)
-}
-
-type CompactSection struct {
- sync.RWMutex
- values []NeedleValue
- overflow map[Key]NeedleValue
- start Key
- end Key
- counter int
-}
-
-func NewCompactSection(start Key) *CompactSection {
- return &CompactSection{
- values: make([]NeedleValue, batch),
- overflow: make(map[Key]NeedleValue),
- start: start,
- }
-}
-
-//return old entry size
-func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 {
- ret := uint32(0)
- if key > cs.end {
- cs.end = key
- }
- cs.Lock()
- if i := cs.binarySearchValues(key); i >= 0 {
- ret = cs.values[i].Size
- //println("key", key, "old size", ret)
- cs.values[i].Offset, cs.values[i].Size = offset, size
- } else {
- needOverflow := cs.counter >= batch
- needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
- if needOverflow {
- //println("start", cs.start, "counter", cs.counter, "key", key)
- if oldValue, found := cs.overflow[key]; found {
- ret = oldValue.Size
- }
- cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size}
- } else {
- p := &cs.values[cs.counter]
- p.Key, p.Offset, p.Size = key, offset, size
- //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
- cs.counter++
- }
- }
- cs.Unlock()
- return ret
-}
-
-//return old entry size
-func (cs *CompactSection) Delete(key Key) uint32 {
- cs.Lock()
- ret := uint32(0)
- if i := cs.binarySearchValues(key); i >= 0 {
- if cs.values[i].Size > 0 {
- ret = cs.values[i].Size
- cs.values[i].Size = 0
- }
- }
- if v, found := cs.overflow[key]; found {
- delete(cs.overflow, key)
- ret = v.Size
- }
- cs.Unlock()
- return ret
-}
-func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
- cs.RLock()
- if v, ok := cs.overflow[key]; ok {
- cs.RUnlock()
- return &v, true
- }
- if i := cs.binarySearchValues(key); i >= 0 {
- cs.RUnlock()
- return &cs.values[i], true
- }
- cs.RUnlock()
- return nil, false
-}
-func (cs *CompactSection) binarySearchValues(key Key) int {
- l, h := 0, cs.counter-1
- if h >= 0 && cs.values[h].Key < key {
- return -2
- }
- //println("looking for key", key)
- for l <= h {
- m := (l + h) / 2
- //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
- if cs.values[m].Key < key {
- l = m + 1
- } else if key < cs.values[m].Key {
- h = m - 1
- } else {
- //println("found", m)
- return m
- }
- }
- return -1
-}
-
-//This map assumes mostly inserting increasing keys
-type CompactMap struct {
- list []*CompactSection
-}
-
-func NewCompactMap() CompactMap {
- return CompactMap{}
-}
-
-func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
- x := cm.binarySearchCompactSection(key)
- if x < 0 {
- //println(x, "creating", len(cm.list), "section, starting", key)
- cm.list = append(cm.list, NewCompactSection(key))
- x = len(cm.list) - 1
- //keep compact section sorted by start
- for x > 0 {
- if cm.list[x-1].start > cm.list[x].start {
- cm.list[x-1], cm.list[x] = cm.list[x], cm.list[x-1]
- x = x - 1
- } else {
- break
- }
- }
- }
- return cm.list[x].Set(key, offset, size)
-}
-func (cm *CompactMap) Delete(key Key) uint32 {
- x := cm.binarySearchCompactSection(key)
- if x < 0 {
- return uint32(0)
- }
- return cm.list[x].Delete(key)
-}
-func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) {
- x := cm.binarySearchCompactSection(key)
- if x < 0 {
- return nil, false
- }
- return cm.list[x].Get(key)
-}
-func (cm *CompactMap) binarySearchCompactSection(key Key) int {
- l, h := 0, len(cm.list)-1
- if h < 0 {
- return -5
- }
- if cm.list[h].start <= key {
- if cm.list[h].counter < batch || key <= cm.list[h].end {
- return h
- }
- return -4
- }
- for l <= h {
- m := (l + h) / 2
- if key < cm.list[m].start {
- h = m - 1
- } else { // cm.list[m].start <= key
- if cm.list[m+1].start <= key {
- l = m + 1
- } else {
- return m
- }
- }
- }
- return -3
-}
-
-// Visit visits all entries or stop if any error when visiting
-func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
- for _, cs := range cm.list {
- for _, v := range cs.overflow {
- if err := visit(v); err != nil {
- return err
- }
- }
- for _, v := range cs.values {
- if _, found := cs.overflow[v.Key]; !found {
- if err := visit(v); err != nil {
- return err
- }
- }
- }
- }
- return nil
-}
diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go
deleted file mode 100644
index 1b429f263..000000000
--- a/go/storage/compact_map_perf_test.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package storage
-
-import (
- "log"
- "os"
- "testing"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-func TestMemoryUsage(t *testing.T) {
-
- indexFile, ie := os.OpenFile("../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
- if ie != nil {
- log.Fatalln(ie)
- }
- LoadNewNeedleMap(indexFile)
-
-}
-
-func LoadNewNeedleMap(file *os.File) CompactMap {
- m := NewCompactMap()
- bytes := make([]byte, 16*1024)
- count, e := file.Read(bytes)
- if count > 0 {
- fstat, _ := file.Stat()
- glog.V(0).Infoln("Loading index file", fstat.Name(), "size", fstat.Size())
- }
- for count > 0 && e == nil {
- for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- if offset > 0 {
- m.Set(Key(key), offset, size)
- } else {
- //delete(m, key)
- }
- }
-
- count, e = file.Read(bytes)
- }
- return m
-}
diff --git a/go/storage/compact_map_test.go b/go/storage/compact_map_test.go
deleted file mode 100644
index 1ccb48edb..000000000
--- a/go/storage/compact_map_test.go
+++ /dev/null
@@ -1,77 +0,0 @@
-package storage
-
-import (
- "testing"
-)
-
-func TestIssue52(t *testing.T) {
- m := NewCompactMap()
- m.Set(Key(10002), 10002, 10002)
- if element, ok := m.Get(Key(10002)); ok {
- println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size)
- }
- m.Set(Key(10001), 10001, 10001)
- if element, ok := m.Get(Key(10002)); ok {
- println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size)
- } else {
- t.Fatal("key 10002 missing after setting 10001")
- }
-}
-
-func TestXYZ(t *testing.T) {
- m := NewCompactMap()
- for i := uint32(0); i < 100*batch; i += 2 {
- m.Set(Key(i), i, i)
- }
-
- for i := uint32(0); i < 100*batch; i += 37 {
- m.Delete(Key(i))
- }
-
- for i := uint32(0); i < 10*batch; i += 3 {
- m.Set(Key(i), i+11, i+5)
- }
-
- // for i := uint32(0); i < 100; i++ {
- // if v := m.Get(Key(i)); v != nil {
- // glog.V(4).Infoln(i, "=", v.Key, v.Offset, v.Size)
- // }
- // }
-
- for i := uint32(0); i < 10*batch; i++ {
- v, ok := m.Get(Key(i))
- if i%3 == 0 {
- if !ok {
- t.Fatal("key", i, "missing!")
- }
- if v.Size != i+5 {
- t.Fatal("key", i, "size", v.Size)
- }
- } else if i%37 == 0 {
- if ok && v.Size > 0 {
- t.Fatal("key", i, "should have been deleted needle value", v)
- }
- } else if i%2 == 0 {
- if v.Size != i {
- t.Fatal("key", i, "size", v.Size)
- }
- }
- }
-
- for i := uint32(10 * batch); i < 100*batch; i++ {
- v, ok := m.Get(Key(i))
- if i%37 == 0 {
- if ok && v.Size > 0 {
- t.Fatal("key", i, "should have been deleted needle value", v)
- }
- } else if i%2 == 0 {
- if v == nil {
- t.Fatal("key", i, "missing")
- }
- if v.Size != i {
- t.Fatal("key", i, "size", v.Size)
- }
- }
- }
-
-}
diff --git a/go/storage/crc.go b/go/storage/crc.go
deleted file mode 100644
index 21e384854..000000000
--- a/go/storage/crc.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package storage
-
-import (
- "fmt"
- "github.com/klauspost/crc32"
-
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-var table = crc32.MakeTable(crc32.Castagnoli)
-
-type CRC uint32
-
-func NewCRC(b []byte) CRC {
- return CRC(0).Update(b)
-}
-
-func (c CRC) Update(b []byte) CRC {
- return CRC(crc32.Update(uint32(c), table, b))
-}
-
-func (c CRC) Value() uint32 {
- return uint32(c>>15|c<<17) + 0xa282ead8
-}
-
-func (n *Needle) Etag() string {
- bits := make([]byte, 4)
- util.Uint32toBytes(bits, uint32(n.Checksum))
- return fmt.Sprintf("\"%x\"", bits)
-}
diff --git a/go/storage/disk_location.go b/go/storage/disk_location.go
deleted file mode 100644
index 8cca1a68e..000000000
--- a/go/storage/disk_location.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package storage
-
-import (
- "io/ioutil"
- "strings"
-
- "github.com/chrislusf/seaweedfs/go/glog"
-)
-
-type DiskLocation struct {
- Directory string
- MaxVolumeCount int
- volumes map[VolumeId]*Volume
-}
-
-func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
- location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
- location.volumes = make(map[VolumeId]*Volume)
- return location
-}
-
-func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
-
- if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
- for _, dir := range dirs {
- name := dir.Name()
- if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
- collection := ""
- base := name[:len(name)-len(".dat")]
- i := strings.LastIndex(base, "_")
- if i > 0 {
- collection, base = base[0:i], base[i+1:]
- }
- if vid, err := NewVolumeId(base); err == nil {
- if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
- l.volumes[vid] = v
- glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
- } else {
- glog.V(0).Infof("new volume %s error %s", name, e)
- }
- }
- }
- }
- }
- }
- glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
-}
-
-func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
- for k, v := range l.volumes {
- if v.Collection == collection {
- e = l.deleteVolumeById(k)
- if e != nil {
- return
- }
- }
- }
- return
-}
-
-func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
- v, ok := l.volumes[vid]
- if !ok {
- return
- }
- e = v.Destroy()
- if e != nil {
- return
- }
- delete(l.volumes, vid)
- return
-}
diff --git a/go/storage/file_id.go b/go/storage/file_id.go
deleted file mode 100644
index 64b61ba89..000000000
--- a/go/storage/file_id.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package storage
-
-import (
- "encoding/hex"
- "errors"
- "strings"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type FileId struct {
- VolumeId VolumeId
- Key uint64
- Hashcode uint32
-}
-
-func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId {
- return &FileId{VolumeId: VolumeId, Key: n.Id, Hashcode: n.Cookie}
-}
-func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId {
- return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
-}
-func ParseFileId(fid string) (*FileId, error) {
- a := strings.Split(fid, ",")
- if len(a) != 2 {
- glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a))
- return nil, errors.New("Invalid fid " + fid)
- }
- vid_string, key_hash_string := a[0], a[1]
- volumeId, _ := NewVolumeId(vid_string)
- key, hash, e := ParseKeyHash(key_hash_string)
- return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e
-}
-func (n *FileId) String() string {
- bytes := make([]byte, 12)
- util.Uint64toBytes(bytes[0:8], n.Key)
- util.Uint32toBytes(bytes[8:12], n.Hashcode)
- nonzero_index := 0
- for ; bytes[nonzero_index] == 0; nonzero_index++ {
- }
- return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:])
-}
diff --git a/go/storage/needle.go b/go/storage/needle.go
deleted file mode 100644
index 8ab76c0f3..000000000
--- a/go/storage/needle.go
+++ /dev/null
@@ -1,231 +0,0 @@
-package storage
-
-import (
- "fmt"
- "io/ioutil"
- "mime"
- "net/http"
- "path"
- "strconv"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/images"
- "github.com/chrislusf/seaweedfs/go/operation"
-)
-
-const (
- NeedleHeaderSize = 16 //should never change this
- NeedlePaddingSize = 8
- NeedleChecksumSize = 4
- MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
-)
-
-/*
-* A Needle means a uploaded and stored file.
-* Needle file size is limited to 4GB for now.
- */
-type Needle struct {
- Cookie uint32 `comment:"random number to mitigate brute force lookups"`
- Id uint64 `comment:"needle id"`
- Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
-
- DataSize uint32 `comment:"Data size"` //version2
- Data []byte `comment:"The actual file data"`
- Flags byte `comment:"boolean flags"` //version2
- NameSize uint8 //version2
- Name []byte `comment:"maximum 256 characters"` //version2
- MimeSize uint8 //version2
- Mime []byte `comment:"maximum 256 characters"` //version2
- LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
- Ttl *TTL
-
- Checksum CRC `comment:"CRC32 to check integrity"`
- Padding []byte `comment:"Aligned to 8 bytes"`
-
- rawBlock *Block // underlying supporing []byte, fetched and released into a pool
-}
-
-func (n *Needle) String() (str string) {
- str = fmt.Sprintf("Cookie:%d, Id:%d, Size:%d, DataSize:%d, Name: %s, Mime: %s", n.Cookie, n.Id, n.Size, n.DataSize, n.Name, n.Mime)
- return
-}
-
-func ParseUpload(r *http.Request) (
- fileName string, data []byte, mimeType string, isGzipped bool,
- modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
- form, fe := r.MultipartReader()
- if fe != nil {
- glog.V(0).Infoln("MultipartReader [ERROR]", fe)
- e = fe
- return
- }
-
- //first multi-part item
- part, fe := form.NextPart()
- if fe != nil {
- glog.V(0).Infoln("Reading Multi part [ERROR]", fe)
- e = fe
- return
- }
-
- fileName = part.FileName()
- if fileName != "" {
- fileName = path.Base(fileName)
- }
-
- data, e = ioutil.ReadAll(part)
- if e != nil {
- glog.V(0).Infoln("Reading Content [ERROR]", e)
- return
- }
-
- //if the filename is empty string, do a search on the other multi-part items
- for fileName == "" {
- part2, fe := form.NextPart()
- if fe != nil {
- break // no more or on error, just safely break
- }
-
- fName := part2.FileName()
-
- //found the first <file type> multi-part has filename
- if fName != "" {
- data2, fe2 := ioutil.ReadAll(part2)
- if fe2 != nil {
- glog.V(0).Infoln("Reading Content [ERROR]", fe2)
- e = fe2
- return
- }
-
- //update
- data = data2
- fileName = path.Base(fName)
- break
- }
- }
-
- dotIndex := strings.LastIndex(fileName, ".")
- ext, mtype := "", ""
- if dotIndex > 0 {
- ext = strings.ToLower(fileName[dotIndex:])
- mtype = mime.TypeByExtension(ext)
- }
- contentType := part.Header.Get("Content-Type")
- if contentType != "" && mtype != contentType {
- mimeType = contentType //only return mime type if not deductable
- mtype = contentType
- }
- if part.Header.Get("Content-Encoding") == "gzip" {
- isGzipped = true
- } else if operation.IsGzippable(ext, mtype) {
- if data, e = operation.GzipData(data); e != nil {
- return
- }
- isGzipped = true
- }
- if ext == ".gz" {
- isGzipped = true
- }
- if strings.HasSuffix(fileName, ".gz") &&
- !strings.HasSuffix(fileName, ".tar.gz") {
- fileName = fileName[:len(fileName)-3]
- }
- modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
- ttl, _ = ReadTTL(r.FormValue("ttl"))
- isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
- return
-}
-func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
- fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
- n = new(Needle)
- fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
- if e != nil {
- return
- }
- if len(fname) < 256 {
- n.Name = []byte(fname)
- n.SetHasName()
- }
- if len(mimeType) < 256 {
- n.Mime = []byte(mimeType)
- n.SetHasMime()
- }
- if isGzipped {
- n.SetGzipped()
- }
- if n.LastModified == 0 {
- n.LastModified = uint64(time.Now().Unix())
- }
- n.SetHasLastModifiedDate()
- if n.Ttl != EMPTY_TTL {
- n.SetHasTtl()
- }
-
- if isChunkedFile {
- n.SetIsChunkManifest()
- }
-
- if fixJpgOrientation {
- loweredName := strings.ToLower(fname)
- if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
- n.Data = images.FixJpgOrientation(n.Data)
- }
- }
-
- n.Checksum = NewCRC(n.Data)
-
- commaSep := strings.LastIndex(r.URL.Path, ",")
- dotSep := strings.LastIndex(r.URL.Path, ".")
- fid := r.URL.Path[commaSep+1:]
- if dotSep > 0 {
- fid = r.URL.Path[commaSep+1 : dotSep]
- }
-
- e = n.ParsePath(fid)
-
- return
-}
-func (n *Needle) ParsePath(fid string) (err error) {
- length := len(fid)
- if length <= 8 {
- return fmt.Errorf("Invalid fid: %s", fid)
- }
- delta := ""
- deltaIndex := strings.LastIndex(fid, "_")
- if deltaIndex > 0 {
- fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:]
- }
- n.Id, n.Cookie, err = ParseKeyHash(fid)
- if err != nil {
- return err
- }
- if delta != "" {
- if d, e := strconv.ParseUint(delta, 10, 64); e == nil {
- n.Id += d
- } else {
- return e
- }
- }
- return err
-}
-
-func ParseKeyHash(key_hash_string string) (uint64, uint32, error) {
- if len(key_hash_string) <= 8 {
- return 0, 0, fmt.Errorf("KeyHash is too short.")
- }
- if len(key_hash_string) > 24 {
- return 0, 0, fmt.Errorf("KeyHash is too long.")
- }
- split := len(key_hash_string) - 8
- key, err := strconv.ParseUint(key_hash_string[:split], 16, 64)
- if err != nil {
- return 0, 0, fmt.Errorf("Parse key error: %v", err)
- }
- hash, err := strconv.ParseUint(key_hash_string[split:], 16, 32)
- if err != nil {
- return 0, 0, fmt.Errorf("Parse hash error: %v", err)
- }
- return key, uint32(hash), nil
-}
diff --git a/go/storage/needle_byte_cache.go b/go/storage/needle_byte_cache.go
deleted file mode 100644
index 5db0f8895..000000000
--- a/go/storage/needle_byte_cache.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package storage
-
-import (
- "fmt"
- "os"
- "sync/atomic"
-
- "github.com/hashicorp/golang-lru"
-
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-var (
- bytesCache *lru.Cache
- bytesPool *util.BytesPool
-)
-
-/*
-There are one level of caching, and one level of pooling.
-
-In pooling, all []byte are fetched and returned to the pool bytesPool.
-
-In caching, the string~[]byte mapping is cached
-*/
-func init() {
- bytesPool = util.NewBytesPool()
- bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) {
- value.(*Block).decreaseReference()
- })
-}
-
-type Block struct {
- Bytes []byte
- refCount int32
-}
-
-func (block *Block) decreaseReference() {
- if atomic.AddInt32(&block.refCount, -1) == 0 {
- bytesPool.Put(block.Bytes)
- }
-}
-func (block *Block) increaseReference() {
- atomic.AddInt32(&block.refCount, 1)
-}
-
-// get bytes from the LRU cache of []byte first, then from the bytes pool
-// when []byte in LRU cache is evicted, it will be put back to the bytes pool
-func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) {
- // check cache, return if found
- cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize)
- if obj, found := bytesCache.Get(cacheKey); found {
- block = obj.(*Block)
- block.increaseReference()
- dataSlice = block.Bytes[0:readSize]
- return dataSlice, block, nil
- }
-
- // get the []byte from pool
- b := bytesPool.Get(readSize)
- // refCount = 2, one by the bytesCache, one by the actual needle object
- block = &Block{Bytes: b, refCount: 2}
- dataSlice = block.Bytes[0:readSize]
- _, err = r.ReadAt(dataSlice, offset)
- bytesCache.Add(cacheKey, block)
- return dataSlice, block, err
-}
-
-func (n *Needle) ReleaseMemory() {
- if n.rawBlock != nil {
- n.rawBlock.decreaseReference()
- }
-}
-func ReleaseBytes(b []byte) {
- bytesPool.Put(b)
-}
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
deleted file mode 100644
index 638a9b4af..000000000
--- a/go/storage/needle_map.go
+++ /dev/null
@@ -1,123 +0,0 @@
-package storage
-
-import (
- "fmt"
- "io/ioutil"
- "os"
- "sync"
-
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type NeedleMapType int
-
-const (
- NeedleMapInMemory NeedleMapType = iota
- NeedleMapLevelDb
- NeedleMapBoltDb
-)
-
-type NeedleMapper interface {
- Put(key uint64, offset uint32, size uint32) error
- Get(key uint64) (element *NeedleValue, ok bool)
- Delete(key uint64) error
- Close()
- Destroy() error
- ContentSize() uint64
- DeletedSize() uint64
- FileCount() int
- DeletedCount() int
- MaxFileKey() uint64
- IndexFileSize() uint64
- IndexFileContent() ([]byte, error)
- IndexFileName() string
-}
-
-type baseNeedleMapper struct {
- indexFile *os.File
- indexFileAccessLock sync.Mutex
-
- mapMetric
-}
-
-func (nm *baseNeedleMapper) IndexFileSize() uint64 {
- stat, err := nm.indexFile.Stat()
- if err == nil {
- return uint64(stat.Size())
- }
- return 0
-}
-
-func (nm *baseNeedleMapper) IndexFileName() string {
- return nm.indexFile.Name()
-}
-
-func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) {
- key = util.BytesToUint64(bytes[:8])
- offset = util.BytesToUint32(bytes[8:12])
- size = util.BytesToUint32(bytes[12:16])
- return
-}
-func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error {
- bytes := make([]byte, 16)
- util.Uint64toBytes(bytes[0:8], key)
- util.Uint32toBytes(bytes[8:12], offset)
- util.Uint32toBytes(bytes[12:16], size)
-
- nm.indexFileAccessLock.Lock()
- defer nm.indexFileAccessLock.Unlock()
- if _, err := nm.indexFile.Seek(0, 2); err != nil {
- return fmt.Errorf("cannot seek end of indexfile %s: %v",
- nm.indexFile.Name(), err)
- }
- _, err := nm.indexFile.Write(bytes)
- return err
-}
-func (nm *baseNeedleMapper) IndexFileContent() ([]byte, error) {
- nm.indexFileAccessLock.Lock()
- defer nm.indexFileAccessLock.Unlock()
- return ioutil.ReadFile(nm.indexFile.Name())
-}
-
-type mapMetric struct {
- indexFile *os.File
-
- DeletionCounter int `json:"DeletionCounter"`
- FileCounter int `json:"FileCounter"`
- DeletionByteCounter uint64 `json:"DeletionByteCounter"`
- FileByteCounter uint64 `json:"FileByteCounter"`
- MaximumFileKey uint64 `json:"MaxFileKey"`
-}
-
-func (mm *mapMetric) logDelete(deletedByteCount uint32) {
- mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
- mm.DeletionCounter++
-}
-
-func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
- if key > mm.MaximumFileKey {
- mm.MaximumFileKey = key
- }
- mm.FileCounter++
- mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
- if oldSize > 0 {
- mm.DeletionCounter++
- mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
- }
-}
-
-func (mm mapMetric) ContentSize() uint64 {
- return mm.FileByteCounter
-}
-func (mm mapMetric) DeletedSize() uint64 {
- return mm.DeletionByteCounter
-}
-func (mm mapMetric) FileCount() int {
- return mm.FileCounter
-}
-func (mm mapMetric) DeletedCount() int {
- return mm.DeletionCounter
-}
-func (mm mapMetric) MaxFileKey() uint64 {
- return mm.MaximumFileKey
-}
diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go
deleted file mode 100644
index e95c016bb..000000000
--- a/go/storage/needle_map_boltdb.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package storage
-
-import (
- "fmt"
- "os"
-
- "github.com/boltdb/bolt"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type BoltDbNeedleMap struct {
- dbFileName string
- db *bolt.DB
- baseNeedleMapper
-}
-
-var boltdbBucket = []byte("weed")
-
-func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) {
- m = &BoltDbNeedleMap{dbFileName: dbFileName}
- m.indexFile = indexFile
- if !isBoltDbFresh(dbFileName, indexFile) {
- glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
- generateBoltDbFile(dbFileName, indexFile)
- glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
- }
- glog.V(1).Infof("Opening %s...", dbFileName)
- if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil {
- return
- }
- glog.V(1).Infof("Loading %s...", indexFile.Name())
- nm, indexLoadError := LoadNeedleMap(indexFile)
- if indexLoadError != nil {
- return nil, indexLoadError
- }
- m.mapMetric = nm.mapMetric
- return
-}
-
-func isBoltDbFresh(dbFileName string, indexFile *os.File) bool {
- // normally we always write to index file first
- dbLogFile, err := os.Open(dbFileName)
- if err != nil {
- return false
- }
- defer dbLogFile.Close()
- dbStat, dbStatErr := dbLogFile.Stat()
- indexStat, indexStatErr := indexFile.Stat()
- if dbStatErr != nil || indexStatErr != nil {
- glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
- return false
- }
-
- return dbStat.ModTime().After(indexStat.ModTime())
-}
-
-func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
- db, err := bolt.Open(dbFileName, 0644, nil)
- if err != nil {
- return err
- }
- defer db.Close()
- return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
- if offset > 0 {
- boltDbWrite(db, key, offset, size)
- } else {
- boltDbDelete(db, key)
- }
- return nil
- })
-}
-
-func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- bytes := make([]byte, 8)
- var data []byte
- util.Uint64toBytes(bytes, key)
- err := m.db.View(func(tx *bolt.Tx) error {
- bucket := tx.Bucket(boltdbBucket)
- if bucket == nil {
- return fmt.Errorf("Bucket %q not found!", boltdbBucket)
- }
-
- data = bucket.Get(bytes)
- return nil
- })
-
- if err != nil || len(data) != 8 {
- return nil, false
- }
- offset := util.BytesToUint32(data[0:4])
- size := util.BytesToUint32(data[4:8])
- return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
-}
-
-func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
- var oldSize uint32
- if oldNeedle, ok := m.Get(key); ok {
- oldSize = oldNeedle.Size
- }
- m.logPut(key, oldSize, size)
- // write to index file first
- if err := m.appendToIndexFile(key, offset, size); err != nil {
- return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
- }
- return boltDbWrite(m.db, key, offset, size)
-}
-
-func boltDbWrite(db *bolt.DB,
- key uint64, offset uint32, size uint32) error {
- bytes := make([]byte, 16)
- util.Uint64toBytes(bytes[0:8], key)
- util.Uint32toBytes(bytes[8:12], offset)
- util.Uint32toBytes(bytes[12:16], size)
- return db.Update(func(tx *bolt.Tx) error {
- bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
- if err != nil {
- return err
- }
-
- err = bucket.Put(bytes[0:8], bytes[8:16])
- if err != nil {
- return err
- }
- return nil
- })
-}
-func boltDbDelete(db *bolt.DB, key uint64) error {
- bytes := make([]byte, 8)
- util.Uint64toBytes(bytes, key)
- return db.Update(func(tx *bolt.Tx) error {
- bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
- if err != nil {
- return err
- }
-
- err = bucket.Delete(bytes)
- if err != nil {
- return err
- }
- return nil
- })
-}
-
-func (m *BoltDbNeedleMap) Delete(key uint64) error {
- if oldNeedle, ok := m.Get(key); ok {
- m.logDelete(oldNeedle.Size)
- }
- // write to index file first
- if err := m.appendToIndexFile(key, 0, 0); err != nil {
- return err
- }
- return boltDbDelete(m.db, key)
-}
-
-func (m *BoltDbNeedleMap) Close() {
- m.db.Close()
-}
-
-func (m *BoltDbNeedleMap) Destroy() error {
- m.Close()
- os.Remove(m.indexFile.Name())
- return os.Remove(m.dbFileName)
-}
diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go
deleted file mode 100644
index 47f63e3ae..000000000
--- a/go/storage/needle_map_leveldb.go
+++ /dev/null
@@ -1,134 +0,0 @@
-package storage
-
-import (
- "fmt"
- "os"
- "path/filepath"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
- "github.com/syndtr/goleveldb/leveldb"
-)
-
-type LevelDbNeedleMap struct {
- dbFileName string
- db *leveldb.DB
- baseNeedleMapper
-}
-
-func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) {
- m = &LevelDbNeedleMap{dbFileName: dbFileName}
- m.indexFile = indexFile
- if !isLevelDbFresh(dbFileName, indexFile) {
- glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
- generateLevelDbFile(dbFileName, indexFile)
- glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
- }
- glog.V(1).Infof("Opening %s...", dbFileName)
- if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil {
- return
- }
- glog.V(1).Infof("Loading %s...", indexFile.Name())
- nm, indexLoadError := LoadNeedleMap(indexFile)
- if indexLoadError != nil {
- return nil, indexLoadError
- }
- m.mapMetric = nm.mapMetric
- return
-}
-
-func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
- // normally we always write to index file first
- dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
- if err != nil {
- return false
- }
- defer dbLogFile.Close()
- dbStat, dbStatErr := dbLogFile.Stat()
- indexStat, indexStatErr := indexFile.Stat()
- if dbStatErr != nil || indexStatErr != nil {
- glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
- return false
- }
-
- return dbStat.ModTime().After(indexStat.ModTime())
-}
-
-func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
- db, err := leveldb.OpenFile(dbFileName, nil)
- if err != nil {
- return err
- }
- defer db.Close()
- return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
- if offset > 0 {
- levelDbWrite(db, key, offset, size)
- } else {
- levelDbDelete(db, key)
- }
- return nil
- })
-}
-
-func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- bytes := make([]byte, 8)
- util.Uint64toBytes(bytes, key)
- data, err := m.db.Get(bytes, nil)
- if err != nil || len(data) != 8 {
- return nil, false
- }
- offset := util.BytesToUint32(data[0:4])
- size := util.BytesToUint32(data[4:8])
- return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
-}
-
-func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
- var oldSize uint32
- if oldNeedle, ok := m.Get(key); ok {
- oldSize = oldNeedle.Size
- }
- m.logPut(key, oldSize, size)
- // write to index file first
- if err := m.appendToIndexFile(key, offset, size); err != nil {
- return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
- }
- return levelDbWrite(m.db, key, offset, size)
-}
-
-func levelDbWrite(db *leveldb.DB,
- key uint64, offset uint32, size uint32) error {
- bytes := make([]byte, 16)
- util.Uint64toBytes(bytes[0:8], key)
- util.Uint32toBytes(bytes[8:12], offset)
- util.Uint32toBytes(bytes[12:16], size)
- if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil {
- return fmt.Errorf("failed to write leveldb: %v", err)
- }
- return nil
-}
-func levelDbDelete(db *leveldb.DB, key uint64) error {
- bytes := make([]byte, 8)
- util.Uint64toBytes(bytes, key)
- return db.Delete(bytes, nil)
-}
-
-func (m *LevelDbNeedleMap) Delete(key uint64) error {
- if oldNeedle, ok := m.Get(key); ok {
- m.logDelete(oldNeedle.Size)
- }
- // write to index file first
- if err := m.appendToIndexFile(key, 0, 0); err != nil {
- return err
- }
- return levelDbDelete(m.db, key)
-}
-
-func (m *LevelDbNeedleMap) Close() {
- m.db.Close()
-}
-
-func (m *LevelDbNeedleMap) Destroy() error {
- m.Close()
- os.Remove(m.indexFile.Name())
- return os.Remove(m.dbFileName)
-}
diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go
deleted file mode 100644
index 2b1fc1b54..000000000
--- a/go/storage/needle_map_memory.go
+++ /dev/null
@@ -1,106 +0,0 @@
-package storage
-
-import (
- "io"
- "os"
-
- "github.com/chrislusf/seaweedfs/go/glog"
-)
-
-type NeedleMap struct {
- m CompactMap
-
- baseNeedleMapper
-}
-
-func NewNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
- m: NewCompactMap(),
- }
- nm.indexFile = file
- return nm
-}
-
-const (
- RowsToRead = 1024
-)
-
-func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
- nm := NewNeedleMap(file)
- e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
- if key > nm.MaximumFileKey {
- nm.MaximumFileKey = key
- }
- nm.FileCounter++
- nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- } else {
- oldSize := nm.m.Delete(Key(key))
- glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- return nil
- })
- glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
- return nm, e
-}
-
-// walks through the index file, calls fn function with each key, offset, size
-// stops with the error returned by the fn function
-func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
- var readerOffset int64
- bytes := make([]byte, 16*RowsToRead)
- count, e := r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
- readerOffset += int64(count)
- var (
- key uint64
- offset, size uint32
- i int
- )
-
- for count > 0 && e == nil || e == io.EOF {
- for i = 0; i+16 <= count; i += 16 {
- key, offset, size = idxFileEntry(bytes[i : i+16])
- if e = fn(key, offset, size); e != nil {
- return e
- }
- }
- if e == io.EOF {
- return nil
- }
- count, e = r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
- readerOffset += int64(count)
- }
- return e
-}
-
-func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
- oldSize := nm.m.Set(Key(key), offset, size)
- nm.logPut(key, oldSize, size)
- return nm.appendToIndexFile(key, offset, size)
-}
-func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- element, ok = nm.m.Get(Key(key))
- return
-}
-func (nm *NeedleMap) Delete(key uint64) error {
- deletedBytes := nm.m.Delete(Key(key))
- nm.logDelete(deletedBytes)
- return nm.appendToIndexFile(key, 0, 0)
-}
-func (nm *NeedleMap) Close() {
- _ = nm.indexFile.Close()
-}
-func (nm *NeedleMap) Destroy() error {
- nm.Close()
- return os.Remove(nm.indexFile.Name())
-}
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
deleted file mode 100644
index fcca2469c..000000000
--- a/go/storage/needle_read_write.go
+++ /dev/null
@@ -1,291 +0,0 @@
-package storage
-
-import (
- "errors"
- "fmt"
- "io"
- "os"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-const (
- FlagGzip = 0x01
- FlagHasName = 0x02
- FlagHasMime = 0x04
- FlagHasLastModifiedDate = 0x08
- FlagHasTtl = 0x10
- FlagIsChunkManifest = 0x80
- LastModifiedBytesLength = 5
- TtlBytesLength = 2
-)
-
-func (n *Needle) DiskSize() int64 {
- padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize)
- return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize
-}
-func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
- if s, ok := w.(io.Seeker); ok {
- if end, e := s.Seek(0, 1); e == nil {
- defer func(s io.Seeker, off int64) {
- if err != nil {
- if _, e = s.Seek(off, 0); e != nil {
- glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
- }
- }
- }(s, end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
- return
- }
- }
- switch version {
- case Version1:
- header := make([]byte, NeedleHeaderSize)
- util.Uint32toBytes(header[0:4], n.Cookie)
- util.Uint64toBytes(header[4:12], n.Id)
- n.Size = uint32(len(n.Data))
- size = n.Size
- util.Uint32toBytes(header[12:16], n.Size)
- if _, err = w.Write(header); err != nil {
- return
- }
- if _, err = w.Write(n.Data); err != nil {
- return
- }
- padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
- util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
- _, err = w.Write(header[0 : NeedleChecksumSize+padding])
- return
- case Version2:
- header := make([]byte, NeedleHeaderSize)
- util.Uint32toBytes(header[0:4], n.Cookie)
- util.Uint64toBytes(header[4:12], n.Id)
- n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
- if n.DataSize > 0 {
- n.Size = 4 + n.DataSize + 1
- if n.HasName() {
- n.Size = n.Size + 1 + uint32(n.NameSize)
- }
- if n.HasMime() {
- n.Size = n.Size + 1 + uint32(n.MimeSize)
- }
- if n.HasLastModifiedDate() {
- n.Size = n.Size + LastModifiedBytesLength
- }
- if n.HasTtl() {
- n.Size = n.Size + TtlBytesLength
- }
- } else {
- n.Size = 0
- }
- size = n.DataSize
- util.Uint32toBytes(header[12:16], n.Size)
- if _, err = w.Write(header); err != nil {
- return
- }
- if n.DataSize > 0 {
- util.Uint32toBytes(header[0:4], n.DataSize)
- if _, err = w.Write(header[0:4]); err != nil {
- return
- }
- if _, err = w.Write(n.Data); err != nil {
- return
- }
- util.Uint8toBytes(header[0:1], n.Flags)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- if n.HasName() {
- util.Uint8toBytes(header[0:1], n.NameSize)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- if _, err = w.Write(n.Name); err != nil {
- return
- }
- }
- if n.HasMime() {
- util.Uint8toBytes(header[0:1], n.MimeSize)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- if _, err = w.Write(n.Mime); err != nil {
- return
- }
- }
- if n.HasLastModifiedDate() {
- util.Uint64toBytes(header[0:8], n.LastModified)
- if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
- return
- }
- }
- if n.HasTtl() && n.Ttl != nil {
- n.Ttl.ToBytes(header[0:TtlBytesLength])
- if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
- return
- }
- }
- }
- padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
- util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
- _, err = w.Write(header[0 : NeedleChecksumSize+padding])
- return n.DataSize, err
- }
- return 0, fmt.Errorf("Unsupported Version! (%d)", version)
-}
-
-func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
- padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
- readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
- return getBytesForFileBlock(r, offset, int(readSize))
-}
-
-func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
- bytes, block, err := ReadNeedleBlob(r, offset, size)
- if err != nil {
- return err
- }
- n.rawBlock = block
- n.ParseNeedleHeader(bytes)
- if n.Size != size {
- return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
- }
- switch version {
- case Version1:
- n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
- case Version2:
- n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
- }
- checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
- newChecksum := NewCRC(n.Data)
- if checksum != newChecksum.Value() {
- return errors.New("CRC error! Data On Disk Corrupted")
- }
- n.Checksum = newChecksum
- return nil
-}
-func (n *Needle) ParseNeedleHeader(bytes []byte) {
- n.Cookie = util.BytesToUint32(bytes[0:4])
- n.Id = util.BytesToUint64(bytes[4:12])
- n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
-}
-func (n *Needle) readNeedleDataVersion2(bytes []byte) {
- index, lenBytes := 0, len(bytes)
- if index < lenBytes {
- n.DataSize = util.BytesToUint32(bytes[index : index+4])
- index = index + 4
- if int(n.DataSize)+index > lenBytes {
- // this if clause is due to bug #87 and #93, fixed in v0.69
- // remove this clause later
- return
- }
- n.Data = bytes[index : index+int(n.DataSize)]
- index = index + int(n.DataSize)
- n.Flags = bytes[index]
- index = index + 1
- }
- if index < lenBytes && n.HasName() {
- n.NameSize = uint8(bytes[index])
- index = index + 1
- n.Name = bytes[index : index+int(n.NameSize)]
- index = index + int(n.NameSize)
- }
- if index < lenBytes && n.HasMime() {
- n.MimeSize = uint8(bytes[index])
- index = index + 1
- n.Mime = bytes[index : index+int(n.MimeSize)]
- index = index + int(n.MimeSize)
- }
- if index < lenBytes && n.HasLastModifiedDate() {
- n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
- index = index + LastModifiedBytesLength
- }
- if index < lenBytes && n.HasTtl() {
- n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
- index = index + TtlBytesLength
- }
-}
-
-func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
- n = new(Needle)
- if version == Version1 || version == Version2 {
- bytes := make([]byte, NeedleHeaderSize)
- var count int
- count, err = r.ReadAt(bytes, offset)
- if count <= 0 || err != nil {
- return nil, 0, err
- }
- n.ParseNeedleHeader(bytes)
- padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
- bodyLength = n.Size + NeedleChecksumSize + padding
- }
- return
-}
-
-//n should be a needle already read the header
-//the input stream will read until next file entry
-func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) {
- if bodyLength <= 0 {
- return nil
- }
- switch version {
- case Version1:
- bytes := make([]byte, bodyLength)
- if _, err = r.ReadAt(bytes, offset); err != nil {
- return
- }
- n.Data = bytes[:n.Size]
- n.Checksum = NewCRC(n.Data)
- case Version2:
- bytes := make([]byte, bodyLength)
- if _, err = r.ReadAt(bytes, offset); err != nil {
- return
- }
- n.readNeedleDataVersion2(bytes[0:n.Size])
- n.Checksum = NewCRC(n.Data)
- default:
- err = fmt.Errorf("Unsupported Version! (%d)", version)
- }
- return
-}
-
-func (n *Needle) IsGzipped() bool {
- return n.Flags&FlagGzip > 0
-}
-func (n *Needle) SetGzipped() {
- n.Flags = n.Flags | FlagGzip
-}
-func (n *Needle) HasName() bool {
- return n.Flags&FlagHasName > 0
-}
-func (n *Needle) SetHasName() {
- n.Flags = n.Flags | FlagHasName
-}
-func (n *Needle) HasMime() bool {
- return n.Flags&FlagHasMime > 0
-}
-func (n *Needle) SetHasMime() {
- n.Flags = n.Flags | FlagHasMime
-}
-func (n *Needle) HasLastModifiedDate() bool {
- return n.Flags&FlagHasLastModifiedDate > 0
-}
-func (n *Needle) SetHasLastModifiedDate() {
- n.Flags = n.Flags | FlagHasLastModifiedDate
-}
-func (n *Needle) HasTtl() bool {
- return n.Flags&FlagHasTtl > 0
-}
-func (n *Needle) SetHasTtl() {
- n.Flags = n.Flags | FlagHasTtl
-}
-
-func (n *Needle) IsChunkedManifest() bool {
- return n.Flags&FlagIsChunkManifest > 0
-}
-
-func (n *Needle) SetIsChunkManifest() {
- n.Flags = n.Flags | FlagIsChunkManifest
-}
diff --git a/go/storage/needle_test.go b/go/storage/needle_test.go
deleted file mode 100644
index c05afda2f..000000000
--- a/go/storage/needle_test.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package storage
-
-import "testing"
-
-func TestParseKeyHash(t *testing.T) {
- testcases := []struct {
- KeyHash string
- ID uint64
- Cookie uint32
- Err bool
- }{
- // normal
- {"4ed4c8116e41", 0x4ed4, 0xc8116e41, false},
- // cookie with leading zeros
- {"4ed401116e41", 0x4ed4, 0x01116e41, false},
- // odd length
- {"ed400116e41", 0xed4, 0x00116e41, false},
- // uint
- {"fed4c8114ed4c811f0116e41", 0xfed4c8114ed4c811, 0xf0116e41, false},
- // err: too short
- {"4ed4c811", 0, 0, true},
- // err: too long
- {"4ed4c8114ed4c8114ed4c8111", 0, 0, true},
- // err: invalid character
- {"helloworld", 0, 0, true},
- }
-
- for _, tc := range testcases {
- if id, cookie, err := ParseKeyHash(tc.KeyHash); err != nil && !tc.Err {
- t.Fatalf("Parse %s error: %v", tc.KeyHash, err)
- } else if err == nil && tc.Err {
- t.Fatalf("Parse %s expected error got nil", tc.KeyHash)
- } else if id != tc.ID || cookie != tc.Cookie {
- t.Fatalf("Parse %s wrong result. Expected: (%d, %d) got: (%d, %d)", tc.KeyHash, tc.ID, tc.Cookie, id, cookie)
- }
- }
-}
-
-func BenchmarkParseKeyHash(b *testing.B) {
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- ParseKeyHash("4ed44ed44ed44ed4c8116e41")
- }
-}
diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go
deleted file mode 100644
index c1aca52eb..000000000
--- a/go/storage/replica_placement.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package storage
-
-import (
- "errors"
- "fmt"
-)
-
-type ReplicaPlacement struct {
- SameRackCount int
- DiffRackCount int
- DiffDataCenterCount int
-}
-
-func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
- rp := &ReplicaPlacement{}
- for i, c := range t {
- count := int(c - '0')
- if 0 <= count && count <= 2 {
- switch i {
- case 0:
- rp.DiffDataCenterCount = count
- case 1:
- rp.DiffRackCount = count
- case 2:
- rp.SameRackCount = count
- }
- } else {
- return rp, errors.New("Unknown Replication Type:" + t)
- }
- }
- return rp, nil
-}
-
-func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
- return NewReplicaPlacementFromString(fmt.Sprintf("%03d", b))
-}
-
-func (rp *ReplicaPlacement) Byte() byte {
- ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
- return byte(ret)
-}
-
-func (rp *ReplicaPlacement) String() string {
- b := make([]byte, 3)
- b[0] = byte(rp.DiffDataCenterCount + '0')
- b[1] = byte(rp.DiffRackCount + '0')
- b[2] = byte(rp.SameRackCount + '0')
- return string(b)
-}
-
-func (rp *ReplicaPlacement) GetCopyCount() int {
- return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
-}
diff --git a/go/storage/replica_placement_test.go b/go/storage/replica_placement_test.go
deleted file mode 100644
index 9c2161e94..000000000
--- a/go/storage/replica_placement_test.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package storage
-
-import (
- "testing"
-)
-
-func TestReplicaPlacemnetSerialDeserial(t *testing.T) {
- rp, _ := NewReplicaPlacementFromString("001")
- new_rp, _ := NewReplicaPlacementFromByte(rp.Byte())
- if rp.String() != new_rp.String() {
- println("expected:", rp.String(), "actual:", new_rp.String())
- t.Fail()
- }
-}
diff --git a/go/storage/store.go b/go/storage/store.go
deleted file mode 100644
index dd312c075..000000000
--- a/go/storage/store.go
+++ /dev/null
@@ -1,340 +0,0 @@
-package storage
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "math/rand"
- "strconv"
- "strings"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/operation"
- "github.com/chrislusf/seaweedfs/go/security"
- "github.com/chrislusf/seaweedfs/go/util"
- "github.com/golang/protobuf/proto"
-)
-
-const (
- MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
-)
-
-type MasterNodes struct {
- nodes []string
- lastNode int
-}
-
-func (mn *MasterNodes) String() string {
- return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
-}
-
-func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
- mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
- return
-}
-func (mn *MasterNodes) reset() {
- glog.V(4).Infof("Resetting master nodes: %v", mn)
- if len(mn.nodes) > 1 && mn.lastNode >= 0 {
- glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
- mn.lastNode = -mn.lastNode - 1
- }
-}
-func (mn *MasterNodes) findMaster() (string, error) {
- if len(mn.nodes) == 0 {
- return "", errors.New("No master node found!")
- }
- if mn.lastNode < 0 {
- for _, m := range mn.nodes {
- glog.V(4).Infof("Listing masters on %s", m)
- if masters, e := operation.ListMasters(m); e == nil {
- if len(masters) == 0 {
- continue
- }
- mn.nodes = append(masters, m)
- mn.lastNode = rand.Intn(len(mn.nodes))
- glog.V(2).Infof("current master nodes is %v", mn)
- break
- } else {
- glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
- }
- }
- }
- if mn.lastNode < 0 {
- return "", errors.New("No master node available!")
- }
- return mn.nodes[mn.lastNode], nil
-}
-
-/*
- * A VolumeServer contains one Store
- */
-type Store struct {
- Ip string
- Port int
- PublicUrl string
- Locations []*DiskLocation
- dataCenter string //optional informaton, overwriting master setting if exists
- rack string //optional information, overwriting master setting if exists
- connected bool
- volumeSizeLimit uint64 //read from the master
- masterNodes *MasterNodes
-}
-
-func (s *Store) String() (str string) {
- str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
- return
-}
-
-func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
- s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
- s.Locations = make([]*DiskLocation, 0)
- for i := 0; i < len(dirnames); i++ {
- location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
- location.loadExistingVolumes(needleMapKind)
- s.Locations = append(s.Locations, location)
- }
- return
-}
-func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
- rt, e := NewReplicaPlacementFromString(replicaPlacement)
- if e != nil {
- return e
- }
- ttl, e := ReadTTL(ttlString)
- if e != nil {
- return e
- }
- for _, range_string := range strings.Split(volumeListString, ",") {
- if strings.Index(range_string, "-") < 0 {
- id_string := range_string
- id, err := NewVolumeId(id_string)
- if err != nil {
- return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
- }
- e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
- } else {
- pair := strings.Split(range_string, "-")
- start, start_err := strconv.ParseUint(pair[0], 10, 64)
- if start_err != nil {
- return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
- }
- end, end_err := strconv.ParseUint(pair[1], 10, 64)
- if end_err != nil {
- return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
- }
- for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
- e = err
- }
- }
- }
- }
- return e
-}
-func (s *Store) DeleteCollection(collection string) (e error) {
- for _, location := range s.Locations {
- e = location.DeleteCollectionFromDiskLocation(collection)
- if e != nil {
- return
- }
- }
- return
-}
-
-func (s *Store) findVolume(vid VolumeId) *Volume {
- for _, location := range s.Locations {
- if v, found := location.volumes[vid]; found {
- return v
- }
- }
- return nil
-}
-func (s *Store) findFreeLocation() (ret *DiskLocation) {
- max := 0
- for _, location := range s.Locations {
- currentFreeCount := location.MaxVolumeCount - len(location.volumes)
- if currentFreeCount > max {
- max = currentFreeCount
- ret = location
- }
- }
- return ret
-}
-func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
- if s.findVolume(vid) != nil {
- return fmt.Errorf("Volume Id %d already exists!", vid)
- }
- if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
- location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
- location.volumes[vid] = volume
- return nil
- } else {
- return err
- }
- }
- return fmt.Errorf("No more free space left")
-}
-
-func (s *Store) Status() []*VolumeInfo {
- var stats []*VolumeInfo
- for _, location := range s.Locations {
- for k, v := range location.volumes {
- s := &VolumeInfo{
- Id: VolumeId(k),
- Size: v.ContentSize(),
- Collection: v.Collection,
- ReplicaPlacement: v.ReplicaPlacement,
- Version: v.Version(),
- FileCount: v.nm.FileCount(),
- DeleteCount: v.nm.DeletedCount(),
- DeletedByteCount: v.nm.DeletedSize(),
- ReadOnly: v.readOnly,
- Ttl: v.Ttl}
- stats = append(stats, s)
- }
- }
- sortVolumeInfos(stats)
- return stats
-}
-
-func (s *Store) SetDataCenter(dataCenter string) {
- s.dataCenter = dataCenter
-}
-func (s *Store) SetRack(rack string) {
- s.rack = rack
-}
-
-func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
- s.masterNodes = NewMasterNodes(bootstrapMaster)
-}
-func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
- masterNode, e = s.masterNodes.findMaster()
- if e != nil {
- return
- }
- var volumeMessages []*operation.VolumeInformationMessage
- maxVolumeCount := 0
- var maxFileKey uint64
- for _, location := range s.Locations {
- maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
- for k, v := range location.volumes {
- if maxFileKey < v.nm.MaxFileKey() {
- maxFileKey = v.nm.MaxFileKey()
- }
- if !v.expired(s.volumeSizeLimit) {
- volumeMessage := &operation.VolumeInformationMessage{
- Id: proto.Uint32(uint32(k)),
- Size: proto.Uint64(uint64(v.Size())),
- Collection: proto.String(v.Collection),
- FileCount: proto.Uint64(uint64(v.nm.FileCount())),
- DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
- DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
- ReadOnly: proto.Bool(v.readOnly),
- ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
- Version: proto.Uint32(uint32(v.Version())),
- Ttl: proto.Uint32(v.Ttl.ToUint32()),
- }
- volumeMessages = append(volumeMessages, volumeMessage)
- } else {
- if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
- location.deleteVolumeById(v.Id)
- glog.V(0).Infoln("volume", v.Id, "is deleted.")
- } else {
- glog.V(0).Infoln("volume", v.Id, "is expired.")
- }
- }
- }
- }
-
- joinMessage := &operation.JoinMessage{
- IsInit: proto.Bool(!s.connected),
- Ip: proto.String(s.Ip),
- Port: proto.Uint32(uint32(s.Port)),
- PublicUrl: proto.String(s.PublicUrl),
- MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
- MaxFileKey: proto.Uint64(maxFileKey),
- DataCenter: proto.String(s.dataCenter),
- Rack: proto.String(s.rack),
- Volumes: volumeMessages,
- }
-
- data, err := proto.Marshal(joinMessage)
- if err != nil {
- return "", "", err
- }
-
- joinUrl := "http://" + masterNode + "/dir/join"
- glog.V(4).Infof("Connecting to %s ...", joinUrl)
-
- jsonBlob, err := util.PostBytes(joinUrl, data)
- if err != nil {
- s.masterNodes.reset()
- return "", "", err
- }
- var ret operation.JoinResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
- s.masterNodes.reset()
- return masterNode, "", err
- }
- if ret.Error != "" {
- s.masterNodes.reset()
- return masterNode, "", errors.New(ret.Error)
- }
- s.volumeSizeLimit = ret.VolumeSizeLimit
- secretKey = security.Secret(ret.SecretKey)
- s.connected = true
- return
-}
-func (s *Store) Close() {
- for _, location := range s.Locations {
- for _, v := range location.volumes {
- v.Close()
- }
- }
-}
-func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
- if v := s.findVolume(i); v != nil {
- if v.readOnly {
- err = fmt.Errorf("Volume %d is read only", i)
- return
- }
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
- size, err = v.write(n)
- } else {
- err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
- }
- if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
- glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
- if _, _, e := s.SendHeartbeatToMaster(); e != nil {
- glog.V(0).Infoln("error when reporting size:", e)
- }
- }
- return
- }
- glog.V(0).Infoln("volume", i, "not found!")
- err = fmt.Errorf("Volume %d not found!", i)
- return
-}
-func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
- if v := s.findVolume(i); v != nil && !v.readOnly {
- return v.delete(n)
- }
- return 0, nil
-}
-func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) {
- if v := s.findVolume(i); v != nil {
- return v.readNeedle(n)
- }
- return 0, fmt.Errorf("Volume %v not found!", i)
-}
-func (s *Store) GetVolume(i VolumeId) *Volume {
- return s.findVolume(i)
-}
-
-func (s *Store) HasVolume(i VolumeId) bool {
- v := s.findVolume(i)
- return v != nil
-}
diff --git a/go/storage/store_vacuum.go b/go/storage/store_vacuum.go
deleted file mode 100644
index 52343c898..000000000
--- a/go/storage/store_vacuum.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package storage
-
-import (
- "fmt"
- "strconv"
-
- "github.com/chrislusf/seaweedfs/go/glog"
-)
-
-func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString), false
- }
- garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
- if e != nil {
- return fmt.Errorf("garbageThreshold %s is not a valid float number", garbageThresholdString), false
- }
- if v := s.findVolume(vid); v != nil {
- glog.V(3).Infoln(vid, "garbage level is", v.garbageLevel())
- return nil, garbageThreshold < v.garbageLevel()
- }
- return fmt.Errorf("volume id %d is not found during check compact", vid), false
-}
-func (s *Store) CompactVolume(volumeIdString string) error {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
- }
- if v := s.findVolume(vid); v != nil {
- return v.Compact()
- }
- return fmt.Errorf("volume id %d is not found during compact", vid)
-}
-func (s *Store) CommitCompactVolume(volumeIdString string) error {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
- }
- if v := s.findVolume(vid); v != nil {
- return v.commitCompact()
- }
- return fmt.Errorf("volume id %d is not found during commit compact", vid)
-}
diff --git a/go/storage/volume.go b/go/storage/volume.go
deleted file mode 100644
index 7e330a9e4..000000000
--- a/go/storage/volume.go
+++ /dev/null
@@ -1,430 +0,0 @@
-package storage
-
-import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "os"
- "path"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/go/glog"
-)
-
-type Volume struct {
- Id VolumeId
- dir string
- Collection string
- dataFile *os.File
- nm NeedleMapper
- needleMapKind NeedleMapType
- readOnly bool
-
- SuperBlock
-
- dataFileAccessLock sync.Mutex
- lastModifiedTime uint64 //unix time in seconds
-}
-
-func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
- v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
- v.needleMapKind = needleMapKind
- e = v.load(true, true, needleMapKind)
- return
-}
-func (v *Volume) String() string {
- return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
-}
-
-func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
- v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{}
- v.needleMapKind = needleMapKind
- e = v.load(false, false, needleMapKind)
- return
-}
-func (v *Volume) FileName() (fileName string) {
- if v.Collection == "" {
- fileName = path.Join(v.dir, v.Id.String())
- } else {
- fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
- }
- return
-}
-func (v *Volume) DataFile() *os.File {
- return v.dataFile
-}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
- var e error
- fileName := v.FileName()
-
- if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
- if !canRead {
- return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
- }
- if canWrite {
- v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
- v.lastModifiedTime = uint64(modifiedTime.Unix())
- } else {
- glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
- v.dataFile, e = os.Open(fileName + ".dat")
- v.readOnly = true
- }
- } else {
- if createDatIfMissing {
- v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
- } else {
- return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
- }
- }
-
- if e != nil {
- if !os.IsPermission(e) {
- return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
- }
- }
-
- if v.ReplicaPlacement == nil {
- e = v.readSuperBlock()
- } else {
- e = v.maybeWriteSuperBlock()
- }
- if e == nil && alsoLoadIndex {
- var indexFile *os.File
- if v.readOnly {
- glog.V(1).Infoln("open to read file", fileName+".idx")
- if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
- return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
- }
- } else {
- glog.V(1).Infoln("open to write file", fileName+".idx")
- if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
- return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
- }
- }
- switch needleMapKind {
- case NeedleMapInMemory:
- glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
- if v.nm, e = LoadNeedleMap(indexFile); e != nil {
- glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
- }
- case NeedleMapLevelDb:
- glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
- if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
- }
- case NeedleMapBoltDb:
- glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
- if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
- glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
- }
- }
- }
- return e
-}
-func (v *Volume) Version() Version {
- return v.SuperBlock.Version()
-}
-func (v *Volume) Size() int64 {
- stat, e := v.dataFile.Stat()
- if e == nil {
- return stat.Size()
- }
- glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
- return -1
-}
-
-// Close cleanly shuts down this volume
-func (v *Volume) Close() {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- v.nm.Close()
- _ = v.dataFile.Close()
-}
-
-func (v *Volume) NeedToReplicate() bool {
- return v.ReplicaPlacement.GetCopyCount() > 1
-}
-
-// isFileUnchanged checks whether this needle to write is same as last one.
-// It requires serialized access in the same volume.
-func (v *Volume) isFileUnchanged(n *Needle) bool {
- if v.Ttl.String() != "" {
- return false
- }
- nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- oldNeedle := new(Needle)
- err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
- if err != nil {
- glog.V(0).Infof("Failed to check updated file %v", err)
- return false
- }
- defer oldNeedle.ReleaseMemory()
- if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
- n.DataSize = oldNeedle.DataSize
- return true
- }
- }
- return false
-}
-
-// Destroy removes everything related to this volume
-func (v *Volume) Destroy() (err error) {
- if v.readOnly {
- err = fmt.Errorf("%s is read-only", v.dataFile.Name())
- return
- }
- v.Close()
- err = os.Remove(v.dataFile.Name())
- if err != nil {
- return
- }
- err = v.nm.Destroy()
- return
-}
-
-// AppendBlob append a blob to end of the data file, used in replication
-func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
- if v.readOnly {
- err = fmt.Errorf("%s is read-only", v.dataFile.Name())
- return
- }
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- if offset, err = v.dataFile.Seek(0, 2); err != nil {
- glog.V(0).Infof("failed to seek the end of file: %v", err)
- return
- }
- //ensure file writing starting from aligned positions
- if offset%NeedlePaddingSize != 0 {
- offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
- if offset, err = v.dataFile.Seek(offset, 0); err != nil {
- glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
- return
- }
- }
- v.dataFile.Write(b)
- return
-}
-
-func (v *Volume) write(n *Needle) (size uint32, err error) {
- glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
- if v.readOnly {
- err = fmt.Errorf("%s is read-only", v.dataFile.Name())
- return
- }
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- if v.isFileUnchanged(n) {
- size = n.DataSize
- glog.V(4).Infof("needle is unchanged!")
- return
- }
- var offset int64
- if offset, err = v.dataFile.Seek(0, 2); err != nil {
- glog.V(0).Infof("failed to seek the end of file: %v", err)
- return
- }
-
- //ensure file writing starting from aligned positions
- if offset%NeedlePaddingSize != 0 {
- offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
- if offset, err = v.dataFile.Seek(offset, 0); err != nil {
- glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
- return
- }
- }
-
- if size, err = n.Append(v.dataFile, v.Version()); err != nil {
- if e := v.dataFile.Truncate(offset); e != nil {
- err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
- }
- return
- }
- nv, ok := v.nm.Get(n.Id)
- if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
- if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
- glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
- }
- }
- if v.lastModifiedTime < n.LastModified {
- v.lastModifiedTime = n.LastModified
- }
- return
-}
-
-func (v *Volume) delete(n *Needle) (uint32, error) {
- glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
- if v.readOnly {
- return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
- }
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- nv, ok := v.nm.Get(n.Id)
- //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok {
- size := nv.Size
- if err := v.nm.Delete(n.Id); err != nil {
- return size, err
- }
- if _, err := v.dataFile.Seek(0, 2); err != nil {
- return size, err
- }
- n.Data = nil
- _, err := n.Append(v.dataFile, v.Version())
- return size, err
- }
- return 0, nil
-}
-
-// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *Needle) (int, error) {
- nv, ok := v.nm.Get(n.Id)
- if !ok || nv.Offset == 0 {
- return -1, errors.New("Not Found")
- }
- err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
- if err != nil {
- return 0, err
- }
- bytesRead := len(n.Data)
- if !n.HasTtl() {
- return bytesRead, nil
- }
- ttlMinutes := n.Ttl.Minutes()
- if ttlMinutes == 0 {
- return bytesRead, nil
- }
- if !n.HasLastModifiedDate() {
- return bytesRead, nil
- }
- if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
- return bytesRead, nil
- }
- n.ReleaseMemory()
- return -1, errors.New("Not Found")
-}
-
-func ScanVolumeFile(dirname string, collection string, id VolumeId,
- needleMapKind NeedleMapType,
- visitSuperBlock func(SuperBlock) error,
- readNeedleBody bool,
- visitNeedle func(n *Needle, offset int64) error) (err error) {
- var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
- return fmt.Errorf("Failed to load volume %d: %v", id, err)
- }
- if err = visitSuperBlock(v.SuperBlock); err != nil {
- return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
- }
-
- version := v.Version()
-
- offset := int64(SuperBlockSize)
- n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
- if e != nil {
- err = fmt.Errorf("cannot read needle header: %v", e)
- return
- }
- for n != nil {
- if readNeedleBody {
- if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
- glog.V(0).Infof("cannot read needle body: %v", err)
- //err = fmt.Errorf("cannot read needle body: %v", err)
- //return
- }
- if n.DataSize >= n.Size {
- // this should come from a bug reported on #87 and #93
- // fixed in v0.69
- // remove this whole "if" clause later, long after 0.69
- oldRest, oldSize := rest, n.Size
- padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
- n.Size = 0
- rest = n.Size + NeedleChecksumSize + padding
- if rest%NeedlePaddingSize != 0 {
- rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
- }
- glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
- }
- }
- if err = visitNeedle(n, offset); err != nil {
- glog.V(0).Infof("visit needle error: %v", err)
- }
- offset += int64(NeedleHeaderSize) + int64(rest)
- glog.V(4).Infof("==> new entry offset %d", offset)
- if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read needle header: %v", err)
- }
- glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
- }
-
- return
-}
-
-func (v *Volume) ContentSize() uint64 {
- return v.nm.ContentSize()
-}
-
-func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
- exists = true
- fi, err := os.Stat(filename)
- if os.IsNotExist(err) {
- exists = false
- return
- }
- if fi.Mode()&0400 != 0 {
- canRead = true
- }
- if fi.Mode()&0200 != 0 {
- canWrite = true
- }
- modTime = fi.ModTime()
- return
-}
-
-// volume is expired if modified time + volume ttl < now
-// except when volume is empty
-// or when the volume does not have a ttl
-// or when volumeSizeLimit is 0 when server just starts
-func (v *Volume) expired(volumeSizeLimit uint64) bool {
- if volumeSizeLimit == 0 {
- //skip if we don't know size limit
- return false
- }
- if v.ContentSize() == 0 {
- return false
- }
- if v.Ttl == nil || v.Ttl.Minutes() == 0 {
- return false
- }
- glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
- livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
- glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
- if int64(v.Ttl.Minutes()) < livedMinutes {
- return true
- }
- return false
-}
-
-// wait either maxDelayMinutes or 10% of ttl minutes
-func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
- if v.Ttl == nil || v.Ttl.Minutes() == 0 {
- return false
- }
- removalDelay := v.Ttl.Minutes() / 10
- if removalDelay > maxDelayMinutes {
- removalDelay = maxDelayMinutes
- }
-
- if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
- return true
- }
- return false
-}
diff --git a/go/storage/volume_id.go b/go/storage/volume_id.go
deleted file mode 100644
index 0333c6cf0..000000000
--- a/go/storage/volume_id.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package storage
-
-import (
- "strconv"
-)
-
-type VolumeId uint32
-
-func NewVolumeId(vid string) (VolumeId, error) {
- volumeId, err := strconv.ParseUint(vid, 10, 64)
- return VolumeId(volumeId), err
-}
-func (vid *VolumeId) String() string {
- return strconv.FormatUint(uint64(*vid), 10)
-}
-func (vid *VolumeId) Next() VolumeId {
- return VolumeId(uint32(*vid) + 1)
-}
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
deleted file mode 100644
index a2f139c89..000000000
--- a/go/storage/volume_info.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package storage
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/go/operation"
- "sort"
-)
-
-type VolumeInfo struct {
- Id VolumeId
- Size uint64
- ReplicaPlacement *ReplicaPlacement
- Ttl *TTL
- Collection string
- Version Version
- FileCount int
- DeleteCount int
- DeletedByteCount uint64
- ReadOnly bool
-}
-
-func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) {
- vi = VolumeInfo{
- Id: VolumeId(*m.Id),
- Size: *m.Size,
- Collection: *m.Collection,
- FileCount: int(*m.FileCount),
- DeleteCount: int(*m.DeleteCount),
- DeletedByteCount: *m.DeletedByteCount,
- ReadOnly: *m.ReadOnly,
- Version: Version(*m.Version),
- }
- rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
- if e != nil {
- return vi, e
- }
- vi.ReplicaPlacement = rp
- vi.Ttl = LoadTTLFromUint32(*m.Ttl)
- return vi, nil
-}
-
-func (vi VolumeInfo) String() string {
- return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
- vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
-}
-
-/*VolumesInfo sorting*/
-
-type volumeInfos []*VolumeInfo
-
-func (vis volumeInfos) Len() int {
- return len(vis)
-}
-
-func (vis volumeInfos) Less(i, j int) bool {
- return vis[i].Id < vis[j].Id
-}
-
-func (vis volumeInfos) Swap(i, j int) {
- vis[i], vis[j] = vis[j], vis[i]
-}
-
-func sortVolumeInfos(vis volumeInfos) {
- sort.Sort(vis)
-}
diff --git a/go/storage/volume_info_test.go b/go/storage/volume_info_test.go
deleted file mode 100644
index 9a9c43ad2..000000000
--- a/go/storage/volume_info_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package storage
-
-import "testing"
-
-func TestSortVolumeInfos(t *testing.T) {
- vis := []*VolumeInfo{
- &VolumeInfo{
- Id: 2,
- },
- &VolumeInfo{
- Id: 1,
- },
- &VolumeInfo{
- Id: 3,
- },
- }
- sortVolumeInfos(vis)
- for i := 0; i < len(vis); i++ {
- if vis[i].Id != VolumeId(i+1) {
- t.Fatal()
- }
- }
-}
diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go
deleted file mode 100644
index e37360075..000000000
--- a/go/storage/volume_super_block.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package storage
-
-import (
- "fmt"
- "os"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-const (
- SuperBlockSize = 8
-)
-
-/*
-* Super block currently has 8 bytes allocated for each volume.
-* Byte 0: version, 1 or 2
-* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
-* Byte 2 and byte 3: Time to live. See TTL for definition
-* Byte 4 and byte 5: The number of times the volume has been compacted.
-* Rest bytes: Reserved
- */
-type SuperBlock struct {
- version Version
- ReplicaPlacement *ReplicaPlacement
- Ttl *TTL
- CompactRevision uint16
-}
-
-func (s *SuperBlock) Version() Version {
- return s.version
-}
-func (s *SuperBlock) Bytes() []byte {
- header := make([]byte, SuperBlockSize)
- header[0] = byte(s.version)
- header[1] = s.ReplicaPlacement.Byte()
- s.Ttl.ToBytes(header[2:4])
- util.Uint16toBytes(header[4:6], s.CompactRevision)
- return header
-}
-
-func (v *Volume) maybeWriteSuperBlock() error {
- stat, e := v.dataFile.Stat()
- if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile, e)
- return e
- }
- if stat.Size() == 0 {
- v.SuperBlock.version = CurrentVersion
- _, e = v.dataFile.Write(v.SuperBlock.Bytes())
- if e != nil && os.IsPermission(e) {
- //read-only, but zero length - recreate it!
- if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
- if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
- v.readOnly = false
- }
- }
- }
- }
- return e
-}
-func (v *Volume) readSuperBlock() (err error) {
- if _, err = v.dataFile.Seek(0, 0); err != nil {
- return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err)
- }
- header := make([]byte, SuperBlockSize)
- if _, e := v.dataFile.Read(header); e != nil {
- return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e)
- }
- v.SuperBlock, err = ParseSuperBlock(header)
- return err
-}
-func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
- superBlock.version = Version(header[0])
- if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
- err = fmt.Errorf("cannot read replica type: %s", err.Error())
- }
- superBlock.Ttl = LoadTTLFromBytes(header[2:4])
- superBlock.CompactRevision = util.BytesToUint16(header[4:6])
- return
-}
diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go
deleted file mode 100644
index 13db4b194..000000000
--- a/go/storage/volume_super_block_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package storage
-
-import (
- "testing"
-)
-
-func TestSuperBlockReadWrite(t *testing.T) {
- rp, _ := NewReplicaPlacementFromByte(byte(001))
- ttl, _ := ReadTTL("15d")
- s := &SuperBlock{
- version: CurrentVersion,
- ReplicaPlacement: rp,
- Ttl: ttl,
- }
-
- bytes := s.Bytes()
-
- if !(bytes[2] == 15 && bytes[3] == Day) {
- println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
- t.Fail()
- }
-
-}
diff --git a/go/storage/volume_sync.go b/go/storage/volume_sync.go
deleted file mode 100644
index 2c72d62f0..000000000
--- a/go/storage/volume_sync.go
+++ /dev/null
@@ -1,213 +0,0 @@
-package storage
-
-import (
- "fmt"
- "io"
- "io/ioutil"
- "net/url"
- "os"
- "sort"
- "strconv"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/operation"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-// The volume sync with a master volume via 2 steps:
-// 1. The slave checks master side to find subscription checkpoint
-// to setup the replication.
-// 2. The slave receives the updates from master
-
-/*
-Assume the slave volume needs to follow the master volume.
-
-The master volume could be compacted, and could be many files ahead of
-slave volume.
-
-Step 1:
-The slave volume will ask the master volume for a snapshot
-of (existing file entries, last offset, number of compacted times).
-
-For each entry x in master existing file entries:
- if x does not exist locally:
- add x locally
-
-For each entry y in local slave existing file entries:
- if y does not exist on master:
- delete y locally
-
-Step 2:
-After this, use the last offset and number of compacted times to request
-the master volume to send a new file, and keep looping. If the number of
-compacted times is changed, go back to step 1 (very likely this can be
-optimized more later).
-
-*/
-
-func (v *Volume) Synchronize(volumeServer string) (err error) {
- var lastCompactRevision uint16 = 0
- var compactRevision uint16 = 0
- var masterMap CompactMap
- for i := 0; i < 3; i++ {
- if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
- return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
- }
- if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
- if err = v.Compact(); err != nil {
- return fmt.Errorf("Compact Volume before synchronizing %v", err)
- }
- if err = v.commitCompact(); err != nil {
- return fmt.Errorf("Commit Compact before synchronizing %v", err)
- }
- }
- lastCompactRevision = compactRevision
- if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
- return
- }
- }
- return
-}
-
-type ByOffset []NeedleValue
-
-func (a ByOffset) Len() int { return len(a) }
-func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
-
-// trySynchronizing sync with remote volume server incrementally by
-// make up the local and remote delta.
-func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error {
- slaveIdxFile, err := os.Open(v.nm.IndexFileName())
- if err != nil {
- return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
- }
- defer slaveIdxFile.Close()
- slaveMap, err := LoadNeedleMap(slaveIdxFile)
- if err != nil {
- return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
- }
- var delta []NeedleValue
- if err := masterMap.Visit(func(needleValue NeedleValue) error {
- if needleValue.Key == 0 {
- return nil
- }
- if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok {
- return nil // skip intersection
- }
- delta = append(delta, needleValue)
- return nil
- }); err != nil {
- return fmt.Errorf("Add master entry: %v", err)
- }
- if err := slaveMap.m.Visit(func(needleValue NeedleValue) error {
- if needleValue.Key == 0 {
- return nil
- }
- if _, ok := masterMap.Get(needleValue.Key); ok {
- return nil // skip intersection
- }
- needleValue.Size = 0
- delta = append(delta, needleValue)
- return nil
- }); err != nil {
- return fmt.Errorf("Remove local entry: %v", err)
- }
-
- // simulate to same ordering of remote .dat file needle entries
- sort.Sort(ByOffset(delta))
-
- // make up the delta
- fetchCount := 0
- volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data"
- for _, needleValue := range delta {
- if needleValue.Size == 0 {
- // remove file entry from local
- v.removeNeedle(needleValue.Key)
- continue
- }
- // add master file entry to local data file
- if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil {
- glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
- return err
- }
- fetchCount++
- }
- glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
- return nil
-}
-
-func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) {
- m = NewCompactMap()
-
- syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
- if err != nil {
- return m, 0, 0, err
- }
-
- total := 0
- err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
- // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
- if offset != 0 && size != 0 {
- m.Set(Key(key), offset, size)
- } else {
- m.Delete(Key(key))
- }
- total++
- })
-
- glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
- return m, syncStatus.TailOffset, syncStatus.CompactRevision, err
-
-}
-
-func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse {
- var syncStatus = operation.SyncVolumeResponse{}
- if stat, err := v.dataFile.Stat(); err == nil {
- syncStatus.TailOffset = uint64(stat.Size())
- }
- syncStatus.IdxFileSize = v.nm.IndexFileSize()
- syncStatus.CompactRevision = v.SuperBlock.CompactRevision
- syncStatus.Ttl = v.SuperBlock.Ttl.String()
- syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
- return syncStatus
-}
-
-func (v *Volume) IndexFileContent() ([]byte, error) {
- return v.nm.IndexFileContent()
-}
-
-// removeNeedle removes one needle by needle key
-func (v *Volume) removeNeedle(key Key) {
- n := new(Needle)
- n.Id = uint64(key)
- v.delete(n)
-}
-
-// fetchNeedle fetches a remote volume needle by vid, id, offset
-// The compact revision is checked first in case the remote volume
-// is compacted and the offset is invalid any more.
-func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
- needleValue NeedleValue, compactRevision uint16) error {
- // add master file entry to local data file
- values := make(url.Values)
- values.Add("revision", strconv.Itoa(int(compactRevision)))
- values.Add("volume", v.Id.String())
- values.Add("id", needleValue.Key.String())
- values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10))
- values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10))
- glog.V(4).Infof("Fetch %+v", needleValue)
- return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error {
- b, err := ioutil.ReadAll(r)
- if err != nil {
- return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
- }
- offset, err := v.AppendBlob(b)
- if err != nil {
- return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
- }
- // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
- v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size)
- return nil
- })
-}
diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go
deleted file mode 100644
index 4318bb048..000000000
--- a/go/storage/volume_ttl.go
+++ /dev/null
@@ -1,135 +0,0 @@
-package storage
-
-import (
- "strconv"
-)
-
-const (
- //stored unit types
- Empty byte = iota
- Minute
- Hour
- Day
- Week
- Month
- Year
-)
-
-type TTL struct {
- count byte
- unit byte
-}
-
-var EMPTY_TTL = &TTL{}
-
-// translate a readable ttl to internal ttl
-// Supports format example:
-// 3m: 3 minutes
-// 4h: 4 hours
-// 5d: 5 days
-// 6w: 6 weeks
-// 7M: 7 months
-// 8y: 8 years
-func ReadTTL(ttlString string) (*TTL, error) {
- if ttlString == "" {
- return EMPTY_TTL, nil
- }
- ttlBytes := []byte(ttlString)
- unitByte := ttlBytes[len(ttlBytes)-1]
- countBytes := ttlBytes[0 : len(ttlBytes)-1]
- if '0' <= unitByte && unitByte <= '9' {
- countBytes = ttlBytes
- unitByte = 'm'
- }
- count, err := strconv.Atoi(string(countBytes))
- unit := toStoredByte(unitByte)
- return &TTL{count: byte(count), unit: unit}, err
-}
-
-// read stored bytes to a ttl
-func LoadTTLFromBytes(input []byte) (t *TTL) {
- return &TTL{count: input[0], unit: input[1]}
-}
-
-// read stored bytes to a ttl
-func LoadTTLFromUint32(ttl uint32) (t *TTL) {
- input := make([]byte, 2)
- input[1] = byte(ttl)
- input[0] = byte(ttl >> 8)
- return LoadTTLFromBytes(input)
-}
-
-// save stored bytes to an output with 2 bytes
-func (t *TTL) ToBytes(output []byte) {
- output[0] = t.count
- output[1] = t.unit
-}
-
-func (t *TTL) ToUint32() (output uint32) {
- output = uint32(t.count) << 8
- output += uint32(t.unit)
- return output
-}
-
-func (t *TTL) String() string {
- if t == nil || t.count == 0 {
- return ""
- }
- if t.unit == Empty {
- return ""
- }
- countString := strconv.Itoa(int(t.count))
- switch t.unit {
- case Minute:
- return countString + "m"
- case Hour:
- return countString + "h"
- case Day:
- return countString + "d"
- case Week:
- return countString + "w"
- case Month:
- return countString + "M"
- case Year:
- return countString + "y"
- }
- return ""
-}
-
-func toStoredByte(readableUnitByte byte) byte {
- switch readableUnitByte {
- case 'm':
- return Minute
- case 'h':
- return Hour
- case 'd':
- return Day
- case 'w':
- return Week
- case 'M':
- return Month
- case 'y':
- return Year
- }
- return 0
-}
-
-func (t TTL) Minutes() uint32 {
- switch t.unit {
- case Empty:
- return 0
- case Minute:
- return uint32(t.count)
- case Hour:
- return uint32(t.count) * 60
- case Day:
- return uint32(t.count) * 60 * 24
- case Week:
- return uint32(t.count) * 60 * 24 * 7
- case Month:
- return uint32(t.count) * 60 * 24 * 31
- case Year:
- return uint32(t.count) * 60 * 24 * 365
- }
- return 0
-}
diff --git a/go/storage/volume_ttl_test.go b/go/storage/volume_ttl_test.go
deleted file mode 100644
index 216469a4c..000000000
--- a/go/storage/volume_ttl_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package storage
-
-import (
- "testing"
-)
-
-func TestTTLReadWrite(t *testing.T) {
- ttl, _ := ReadTTL("")
- if ttl.Minutes() != 0 {
- t.Errorf("empty ttl:%v", ttl)
- }
-
- ttl, _ = ReadTTL("9")
- if ttl.Minutes() != 9 {
- t.Errorf("9 ttl:%v", ttl)
- }
-
- ttl, _ = ReadTTL("8m")
- if ttl.Minutes() != 8 {
- t.Errorf("8m ttl:%v", ttl)
- }
-
- ttl, _ = ReadTTL("5h")
- if ttl.Minutes() != 300 {
- t.Errorf("5h ttl:%v", ttl)
- }
-
- ttl, _ = ReadTTL("5d")
- if ttl.Minutes() != 5*24*60 {
- t.Errorf("5d ttl:%v", ttl)
- }
-
- ttl, _ = ReadTTL("5w")
- if ttl.Minutes() != 5*7*24*60 {
- t.Errorf("5w ttl:%v", ttl)
- }
-
- ttl, _ = ReadTTL("5M")
- if ttl.Minutes() != 5*31*24*60 {
- t.Errorf("5M ttl:%v", ttl)
- }
-
- ttl, _ = ReadTTL("5y")
- if ttl.Minutes() != 5*365*24*60 {
- t.Errorf("5y ttl:%v", ttl)
- }
-
- output := make([]byte, 2)
- ttl.ToBytes(output)
- ttl2 := LoadTTLFromBytes(output)
- if ttl.Minutes() != ttl2.Minutes() {
- t.Errorf("ttl:%v ttl2:%v", ttl, ttl2)
- }
-
- ttl3 := LoadTTLFromUint32(ttl.ToUint32())
- if ttl.Minutes() != ttl3.Minutes() {
- t.Errorf("ttl:%v ttl3:%v", ttl, ttl3)
- }
-
-}
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
deleted file mode 100644
index 5ba8d575c..000000000
--- a/go/storage/volume_vacuum.go
+++ /dev/null
@@ -1,93 +0,0 @@
-package storage
-
-import (
- "fmt"
- "os"
- "time"
-
- "github.com/chrislusf/seaweedfs/go/glog"
-)
-
-func (v *Volume) garbageLevel() float64 {
- return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
-}
-
-func (v *Volume) Compact() error {
- glog.V(3).Infof("Compacting ...")
- //no need to lock for copy on write
- //v.accessLock.Lock()
- //defer v.accessLock.Unlock()
- //glog.V(3).Infof("Got Compaction lock...")
-
- filePath := v.FileName()
- glog.V(3).Infof("creating copies for volume %d ...", v.Id)
- return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
-}
-func (v *Volume) commitCompact() error {
- glog.V(3).Infof("Committing vacuuming...")
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- glog.V(3).Infof("Got Committing lock...")
- v.nm.Close()
- _ = v.dataFile.Close()
- var e error
- if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
- return e
- }
- if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
- return e
- }
- //glog.V(3).Infof("Pretending to be vacuuming...")
- //time.Sleep(20 * time.Second)
- glog.V(3).Infof("Loading Commit file...")
- if e = v.load(true, false, v.needleMapKind); e != nil {
- return e
- }
- return nil
-}
-
-func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
- var (
- dst, idx *os.File
- )
- if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
- return
- }
- defer dst.Close()
-
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
- return
- }
- defer idx.Close()
-
- nm := NewNeedleMap(idx)
- new_offset := int64(SuperBlockSize)
-
- now := uint64(time.Now().Unix())
-
- err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind,
- func(superBlock SuperBlock) error {
- superBlock.CompactRevision++
- _, err = dst.Write(superBlock.Bytes())
- return err
- }, true, func(n *Needle, offset int64) error {
- if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
- return nil
- }
- nv, ok := v.nm.Get(n.Id)
- glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
- if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
- return fmt.Errorf("cannot put needle: %s", err)
- }
- if _, err = n.Append(dst, v.Version()); err != nil {
- return fmt.Errorf("cannot append needle: %s", err)
- }
- new_offset += n.DiskSize()
- glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
- }
- return nil
- })
-
- return
-}
diff --git a/go/storage/volume_version.go b/go/storage/volume_version.go
deleted file mode 100644
index 2e9f58aa2..000000000
--- a/go/storage/volume_version.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package storage
-
-type Version uint8
-
-const (
- Version1 = Version(1)
- Version2 = Version(2)
- CurrentVersion = Version2
-)