diff options
Diffstat (limited to 'go/storage')
29 files changed, 0 insertions, 3246 deletions
diff --git a/go/storage/compact_map.go b/go/storage/compact_map.go deleted file mode 100644 index d4438d044..000000000 --- a/go/storage/compact_map.go +++ /dev/null @@ -1,207 +0,0 @@ -package storage - -import ( - "strconv" - "sync" -) - -type NeedleValue struct { - Key Key - Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G - Size uint32 `comment:"Size of the data portion"` -} - -const ( - batch = 100000 -) - -type Key uint64 - -func (k Key) String() string { - return strconv.FormatUint(uint64(k), 10) -} - -type CompactSection struct { - sync.RWMutex - values []NeedleValue - overflow map[Key]NeedleValue - start Key - end Key - counter int -} - -func NewCompactSection(start Key) *CompactSection { - return &CompactSection{ - values: make([]NeedleValue, batch), - overflow: make(map[Key]NeedleValue), - start: start, - } -} - -//return old entry size -func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { - ret := uint32(0) - if key > cs.end { - cs.end = key - } - cs.Lock() - if i := cs.binarySearchValues(key); i >= 0 { - ret = cs.values[i].Size - //println("key", key, "old size", ret) - cs.values[i].Offset, cs.values[i].Size = offset, size - } else { - needOverflow := cs.counter >= batch - needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key - if needOverflow { - //println("start", cs.start, "counter", cs.counter, "key", key) - if oldValue, found := cs.overflow[key]; found { - ret = oldValue.Size - } - cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size} - } else { - p := &cs.values[cs.counter] - p.Key, p.Offset, p.Size = key, offset, size - //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) - cs.counter++ - } - } - cs.Unlock() - return ret -} - -//return old entry size -func (cs *CompactSection) Delete(key Key) uint32 { - cs.Lock() - ret := uint32(0) - if i := cs.binarySearchValues(key); i >= 0 { - if cs.values[i].Size > 0 { - ret = cs.values[i].Size - cs.values[i].Size = 0 - } - } - if v, found := cs.overflow[key]; found { - delete(cs.overflow, key) - ret = v.Size - } - cs.Unlock() - return ret -} -func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { - cs.RLock() - if v, ok := cs.overflow[key]; ok { - cs.RUnlock() - return &v, true - } - if i := cs.binarySearchValues(key); i >= 0 { - cs.RUnlock() - return &cs.values[i], true - } - cs.RUnlock() - return nil, false -} -func (cs *CompactSection) binarySearchValues(key Key) int { - l, h := 0, cs.counter-1 - if h >= 0 && cs.values[h].Key < key { - return -2 - } - //println("looking for key", key) - for l <= h { - m := (l + h) / 2 - //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size) - if cs.values[m].Key < key { - l = m + 1 - } else if key < cs.values[m].Key { - h = m - 1 - } else { - //println("found", m) - return m - } - } - return -1 -} - -//This map assumes mostly inserting increasing keys -type CompactMap struct { - list []*CompactSection -} - -func NewCompactMap() CompactMap { - return CompactMap{} -} - -func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { - x := cm.binarySearchCompactSection(key) - if x < 0 { - //println(x, "creating", len(cm.list), "section, starting", key) - cm.list = append(cm.list, NewCompactSection(key)) - x = len(cm.list) - 1 - //keep compact section sorted by start - for x > 0 { - if cm.list[x-1].start > cm.list[x].start { - cm.list[x-1], cm.list[x] = cm.list[x], cm.list[x-1] - x = x - 1 - } else { - break - } - } - } - return cm.list[x].Set(key, offset, size) -} -func (cm *CompactMap) Delete(key Key) uint32 { - x := cm.binarySearchCompactSection(key) - if x < 0 { - return uint32(0) - } - return cm.list[x].Delete(key) -} -func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { - x := cm.binarySearchCompactSection(key) - if x < 0 { - return nil, false - } - return cm.list[x].Get(key) -} -func (cm *CompactMap) binarySearchCompactSection(key Key) int { - l, h := 0, len(cm.list)-1 - if h < 0 { - return -5 - } - if cm.list[h].start <= key { - if cm.list[h].counter < batch || key <= cm.list[h].end { - return h - } - return -4 - } - for l <= h { - m := (l + h) / 2 - if key < cm.list[m].start { - h = m - 1 - } else { // cm.list[m].start <= key - if cm.list[m+1].start <= key { - l = m + 1 - } else { - return m - } - } - } - return -3 -} - -// Visit visits all entries or stop if any error when visiting -func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { - for _, cs := range cm.list { - for _, v := range cs.overflow { - if err := visit(v); err != nil { - return err - } - } - for _, v := range cs.values { - if _, found := cs.overflow[v.Key]; !found { - if err := visit(v); err != nil { - return err - } - } - } - } - return nil -} diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go deleted file mode 100644 index 1b429f263..000000000 --- a/go/storage/compact_map_perf_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package storage - -import ( - "log" - "os" - "testing" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -func TestMemoryUsage(t *testing.T) { - - indexFile, ie := os.OpenFile("../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) - if ie != nil { - log.Fatalln(ie) - } - LoadNewNeedleMap(indexFile) - -} - -func LoadNewNeedleMap(file *os.File) CompactMap { - m := NewCompactMap() - bytes := make([]byte, 16*1024) - count, e := file.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - glog.V(0).Infoln("Loading index file", fstat.Name(), "size", fstat.Size()) - } - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - if offset > 0 { - m.Set(Key(key), offset, size) - } else { - //delete(m, key) - } - } - - count, e = file.Read(bytes) - } - return m -} diff --git a/go/storage/compact_map_test.go b/go/storage/compact_map_test.go deleted file mode 100644 index 1ccb48edb..000000000 --- a/go/storage/compact_map_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package storage - -import ( - "testing" -) - -func TestIssue52(t *testing.T) { - m := NewCompactMap() - m.Set(Key(10002), 10002, 10002) - if element, ok := m.Get(Key(10002)); ok { - println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) - } - m.Set(Key(10001), 10001, 10001) - if element, ok := m.Get(Key(10002)); ok { - println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) - } else { - t.Fatal("key 10002 missing after setting 10001") - } -} - -func TestXYZ(t *testing.T) { - m := NewCompactMap() - for i := uint32(0); i < 100*batch; i += 2 { - m.Set(Key(i), i, i) - } - - for i := uint32(0); i < 100*batch; i += 37 { - m.Delete(Key(i)) - } - - for i := uint32(0); i < 10*batch; i += 3 { - m.Set(Key(i), i+11, i+5) - } - - // for i := uint32(0); i < 100; i++ { - // if v := m.Get(Key(i)); v != nil { - // glog.V(4).Infoln(i, "=", v.Key, v.Offset, v.Size) - // } - // } - - for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) - if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } - if v.Size != i+5 { - t.Fatal("key", i, "size", v.Size) - } - } else if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } - } else if i%2 == 0 { - if v.Size != i { - t.Fatal("key", i, "size", v.Size) - } - } - } - - for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) - if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } - } else if i%2 == 0 { - if v == nil { - t.Fatal("key", i, "missing") - } - if v.Size != i { - t.Fatal("key", i, "size", v.Size) - } - } - } - -} diff --git a/go/storage/crc.go b/go/storage/crc.go deleted file mode 100644 index 21e384854..000000000 --- a/go/storage/crc.go +++ /dev/null @@ -1,30 +0,0 @@ -package storage - -import ( - "fmt" - "github.com/klauspost/crc32" - - "github.com/chrislusf/seaweedfs/go/util" -) - -var table = crc32.MakeTable(crc32.Castagnoli) - -type CRC uint32 - -func NewCRC(b []byte) CRC { - return CRC(0).Update(b) -} - -func (c CRC) Update(b []byte) CRC { - return CRC(crc32.Update(uint32(c), table, b)) -} - -func (c CRC) Value() uint32 { - return uint32(c>>15|c<<17) + 0xa282ead8 -} - -func (n *Needle) Etag() string { - bits := make([]byte, 4) - util.Uint32toBytes(bits, uint32(n.Checksum)) - return fmt.Sprintf("\"%x\"", bits) -} diff --git a/go/storage/disk_location.go b/go/storage/disk_location.go deleted file mode 100644 index 8cca1a68e..000000000 --- a/go/storage/disk_location.go +++ /dev/null @@ -1,73 +0,0 @@ -package storage - -import ( - "io/ioutil" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -type DiskLocation struct { - Directory string - MaxVolumeCount int - volumes map[VolumeId]*Volume -} - -func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { - location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} - location.volumes = make(map[VolumeId]*Volume) - return location -} - -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { - - if dirs, err := ioutil.ReadDir(l.Directory); err == nil { - for _, dir := range dirs { - name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".dat") { - collection := "" - base := name[:len(name)-len(".dat")] - i := strings.LastIndex(base, "_") - if i > 0 { - collection, base = base[0:i], base[i+1:] - } - if vid, err := NewVolumeId(base); err == nil { - if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { - l.volumes[vid] = v - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) - } else { - glog.V(0).Infof("new volume %s error %s", name, e) - } - } - } - } - } - } - glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) -} - -func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { - for k, v := range l.volumes { - if v.Collection == collection { - e = l.deleteVolumeById(k) - if e != nil { - return - } - } - } - return -} - -func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { - v, ok := l.volumes[vid] - if !ok { - return - } - e = v.Destroy() - if e != nil { - return - } - delete(l.volumes, vid) - return -} diff --git a/go/storage/file_id.go b/go/storage/file_id.go deleted file mode 100644 index 64b61ba89..000000000 --- a/go/storage/file_id.go +++ /dev/null @@ -1,43 +0,0 @@ -package storage - -import ( - "encoding/hex" - "errors" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -type FileId struct { - VolumeId VolumeId - Key uint64 - Hashcode uint32 -} - -func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId { - return &FileId{VolumeId: VolumeId, Key: n.Id, Hashcode: n.Cookie} -} -func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId { - return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} -} -func ParseFileId(fid string) (*FileId, error) { - a := strings.Split(fid, ",") - if len(a) != 2 { - glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a)) - return nil, errors.New("Invalid fid " + fid) - } - vid_string, key_hash_string := a[0], a[1] - volumeId, _ := NewVolumeId(vid_string) - key, hash, e := ParseKeyHash(key_hash_string) - return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e -} -func (n *FileId) String() string { - bytes := make([]byte, 12) - util.Uint64toBytes(bytes[0:8], n.Key) - util.Uint32toBytes(bytes[8:12], n.Hashcode) - nonzero_index := 0 - for ; bytes[nonzero_index] == 0; nonzero_index++ { - } - return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:]) -} diff --git a/go/storage/needle.go b/go/storage/needle.go deleted file mode 100644 index 8ab76c0f3..000000000 --- a/go/storage/needle.go +++ /dev/null @@ -1,231 +0,0 @@ -package storage - -import ( - "fmt" - "io/ioutil" - "mime" - "net/http" - "path" - "strconv" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/images" - "github.com/chrislusf/seaweedfs/go/operation" -) - -const ( - NeedleHeaderSize = 16 //should never change this - NeedlePaddingSize = 8 - NeedleChecksumSize = 4 - MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 -) - -/* -* A Needle means a uploaded and stored file. -* Needle file size is limited to 4GB for now. - */ -type Needle struct { - Cookie uint32 `comment:"random number to mitigate brute force lookups"` - Id uint64 `comment:"needle id"` - Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` - - DataSize uint32 `comment:"Data size"` //version2 - Data []byte `comment:"The actual file data"` - Flags byte `comment:"boolean flags"` //version2 - NameSize uint8 //version2 - Name []byte `comment:"maximum 256 characters"` //version2 - MimeSize uint8 //version2 - Mime []byte `comment:"maximum 256 characters"` //version2 - LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk - Ttl *TTL - - Checksum CRC `comment:"CRC32 to check integrity"` - Padding []byte `comment:"Aligned to 8 bytes"` - - rawBlock *Block // underlying supporing []byte, fetched and released into a pool -} - -func (n *Needle) String() (str string) { - str = fmt.Sprintf("Cookie:%d, Id:%d, Size:%d, DataSize:%d, Name: %s, Mime: %s", n.Cookie, n.Id, n.Size, n.DataSize, n.Name, n.Mime) - return -} - -func ParseUpload(r *http.Request) ( - fileName string, data []byte, mimeType string, isGzipped bool, - modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { - form, fe := r.MultipartReader() - if fe != nil { - glog.V(0).Infoln("MultipartReader [ERROR]", fe) - e = fe - return - } - - //first multi-part item - part, fe := form.NextPart() - if fe != nil { - glog.V(0).Infoln("Reading Multi part [ERROR]", fe) - e = fe - return - } - - fileName = part.FileName() - if fileName != "" { - fileName = path.Base(fileName) - } - - data, e = ioutil.ReadAll(part) - if e != nil { - glog.V(0).Infoln("Reading Content [ERROR]", e) - return - } - - //if the filename is empty string, do a search on the other multi-part items - for fileName == "" { - part2, fe := form.NextPart() - if fe != nil { - break // no more or on error, just safely break - } - - fName := part2.FileName() - - //found the first <file type> multi-part has filename - if fName != "" { - data2, fe2 := ioutil.ReadAll(part2) - if fe2 != nil { - glog.V(0).Infoln("Reading Content [ERROR]", fe2) - e = fe2 - return - } - - //update - data = data2 - fileName = path.Base(fName) - break - } - } - - dotIndex := strings.LastIndex(fileName, ".") - ext, mtype := "", "" - if dotIndex > 0 { - ext = strings.ToLower(fileName[dotIndex:]) - mtype = mime.TypeByExtension(ext) - } - contentType := part.Header.Get("Content-Type") - if contentType != "" && mtype != contentType { - mimeType = contentType //only return mime type if not deductable - mtype = contentType - } - if part.Header.Get("Content-Encoding") == "gzip" { - isGzipped = true - } else if operation.IsGzippable(ext, mtype) { - if data, e = operation.GzipData(data); e != nil { - return - } - isGzipped = true - } - if ext == ".gz" { - isGzipped = true - } - if strings.HasSuffix(fileName, ".gz") && - !strings.HasSuffix(fileName, ".tar.gz") { - fileName = fileName[:len(fileName)-3] - } - modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) - ttl, _ = ReadTTL(r.FormValue("ttl")) - isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) - return -} -func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { - fname, mimeType, isGzipped, isChunkedFile := "", "", false, false - n = new(Needle) - fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) - if e != nil { - return - } - if len(fname) < 256 { - n.Name = []byte(fname) - n.SetHasName() - } - if len(mimeType) < 256 { - n.Mime = []byte(mimeType) - n.SetHasMime() - } - if isGzipped { - n.SetGzipped() - } - if n.LastModified == 0 { - n.LastModified = uint64(time.Now().Unix()) - } - n.SetHasLastModifiedDate() - if n.Ttl != EMPTY_TTL { - n.SetHasTtl() - } - - if isChunkedFile { - n.SetIsChunkManifest() - } - - if fixJpgOrientation { - loweredName := strings.ToLower(fname) - if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") { - n.Data = images.FixJpgOrientation(n.Data) - } - } - - n.Checksum = NewCRC(n.Data) - - commaSep := strings.LastIndex(r.URL.Path, ",") - dotSep := strings.LastIndex(r.URL.Path, ".") - fid := r.URL.Path[commaSep+1:] - if dotSep > 0 { - fid = r.URL.Path[commaSep+1 : dotSep] - } - - e = n.ParsePath(fid) - - return -} -func (n *Needle) ParsePath(fid string) (err error) { - length := len(fid) - if length <= 8 { - return fmt.Errorf("Invalid fid: %s", fid) - } - delta := "" - deltaIndex := strings.LastIndex(fid, "_") - if deltaIndex > 0 { - fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] - } - n.Id, n.Cookie, err = ParseKeyHash(fid) - if err != nil { - return err - } - if delta != "" { - if d, e := strconv.ParseUint(delta, 10, 64); e == nil { - n.Id += d - } else { - return e - } - } - return err -} - -func ParseKeyHash(key_hash_string string) (uint64, uint32, error) { - if len(key_hash_string) <= 8 { - return 0, 0, fmt.Errorf("KeyHash is too short.") - } - if len(key_hash_string) > 24 { - return 0, 0, fmt.Errorf("KeyHash is too long.") - } - split := len(key_hash_string) - 8 - key, err := strconv.ParseUint(key_hash_string[:split], 16, 64) - if err != nil { - return 0, 0, fmt.Errorf("Parse key error: %v", err) - } - hash, err := strconv.ParseUint(key_hash_string[split:], 16, 32) - if err != nil { - return 0, 0, fmt.Errorf("Parse hash error: %v", err) - } - return key, uint32(hash), nil -} diff --git a/go/storage/needle_byte_cache.go b/go/storage/needle_byte_cache.go deleted file mode 100644 index 5db0f8895..000000000 --- a/go/storage/needle_byte_cache.go +++ /dev/null @@ -1,75 +0,0 @@ -package storage - -import ( - "fmt" - "os" - "sync/atomic" - - "github.com/hashicorp/golang-lru" - - "github.com/chrislusf/seaweedfs/go/util" -) - -var ( - bytesCache *lru.Cache - bytesPool *util.BytesPool -) - -/* -There are one level of caching, and one level of pooling. - -In pooling, all []byte are fetched and returned to the pool bytesPool. - -In caching, the string~[]byte mapping is cached -*/ -func init() { - bytesPool = util.NewBytesPool() - bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) { - value.(*Block).decreaseReference() - }) -} - -type Block struct { - Bytes []byte - refCount int32 -} - -func (block *Block) decreaseReference() { - if atomic.AddInt32(&block.refCount, -1) == 0 { - bytesPool.Put(block.Bytes) - } -} -func (block *Block) increaseReference() { - atomic.AddInt32(&block.refCount, 1) -} - -// get bytes from the LRU cache of []byte first, then from the bytes pool -// when []byte in LRU cache is evicted, it will be put back to the bytes pool -func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) { - // check cache, return if found - cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) - if obj, found := bytesCache.Get(cacheKey); found { - block = obj.(*Block) - block.increaseReference() - dataSlice = block.Bytes[0:readSize] - return dataSlice, block, nil - } - - // get the []byte from pool - b := bytesPool.Get(readSize) - // refCount = 2, one by the bytesCache, one by the actual needle object - block = &Block{Bytes: b, refCount: 2} - dataSlice = block.Bytes[0:readSize] - _, err = r.ReadAt(dataSlice, offset) - bytesCache.Add(cacheKey, block) - return dataSlice, block, err -} - -func (n *Needle) ReleaseMemory() { - if n.rawBlock != nil { - n.rawBlock.decreaseReference() - } -} -func ReleaseBytes(b []byte) { - bytesPool.Put(b) -} diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go deleted file mode 100644 index 638a9b4af..000000000 --- a/go/storage/needle_map.go +++ /dev/null @@ -1,123 +0,0 @@ -package storage - -import ( - "fmt" - "io/ioutil" - "os" - "sync" - - "github.com/chrislusf/seaweedfs/go/util" -) - -type NeedleMapType int - -const ( - NeedleMapInMemory NeedleMapType = iota - NeedleMapLevelDb - NeedleMapBoltDb -) - -type NeedleMapper interface { - Put(key uint64, offset uint32, size uint32) error - Get(key uint64) (element *NeedleValue, ok bool) - Delete(key uint64) error - Close() - Destroy() error - ContentSize() uint64 - DeletedSize() uint64 - FileCount() int - DeletedCount() int - MaxFileKey() uint64 - IndexFileSize() uint64 - IndexFileContent() ([]byte, error) - IndexFileName() string -} - -type baseNeedleMapper struct { - indexFile *os.File - indexFileAccessLock sync.Mutex - - mapMetric -} - -func (nm *baseNeedleMapper) IndexFileSize() uint64 { - stat, err := nm.indexFile.Stat() - if err == nil { - return uint64(stat.Size()) - } - return 0 -} - -func (nm *baseNeedleMapper) IndexFileName() string { - return nm.indexFile.Name() -} - -func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { - key = util.BytesToUint64(bytes[:8]) - offset = util.BytesToUint32(bytes[8:12]) - size = util.BytesToUint32(bytes[12:16]) - return -} -func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], offset) - util.Uint32toBytes(bytes[12:16], size) - - nm.indexFileAccessLock.Lock() - defer nm.indexFileAccessLock.Unlock() - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot seek end of indexfile %s: %v", - nm.indexFile.Name(), err) - } - _, err := nm.indexFile.Write(bytes) - return err -} -func (nm *baseNeedleMapper) IndexFileContent() ([]byte, error) { - nm.indexFileAccessLock.Lock() - defer nm.indexFileAccessLock.Unlock() - return ioutil.ReadFile(nm.indexFile.Name()) -} - -type mapMetric struct { - indexFile *os.File - - DeletionCounter int `json:"DeletionCounter"` - FileCounter int `json:"FileCounter"` - DeletionByteCounter uint64 `json:"DeletionByteCounter"` - FileByteCounter uint64 `json:"FileByteCounter"` - MaximumFileKey uint64 `json:"MaxFileKey"` -} - -func (mm *mapMetric) logDelete(deletedByteCount uint32) { - mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) - mm.DeletionCounter++ -} - -func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { - if key > mm.MaximumFileKey { - mm.MaximumFileKey = key - } - mm.FileCounter++ - mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) - if oldSize > 0 { - mm.DeletionCounter++ - mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) - } -} - -func (mm mapMetric) ContentSize() uint64 { - return mm.FileByteCounter -} -func (mm mapMetric) DeletedSize() uint64 { - return mm.DeletionByteCounter -} -func (mm mapMetric) FileCount() int { - return mm.FileCounter -} -func (mm mapMetric) DeletedCount() int { - return mm.DeletionCounter -} -func (mm mapMetric) MaxFileKey() uint64 { - return mm.MaximumFileKey -} diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go deleted file mode 100644 index e95c016bb..000000000 --- a/go/storage/needle_map_boltdb.go +++ /dev/null @@ -1,165 +0,0 @@ -package storage - -import ( - "fmt" - "os" - - "github.com/boltdb/bolt" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -type BoltDbNeedleMap struct { - dbFileName string - db *bolt.DB - baseNeedleMapper -} - -var boltdbBucket = []byte("weed") - -func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) { - m = &BoltDbNeedleMap{dbFileName: dbFileName} - m.indexFile = indexFile - if !isBoltDbFresh(dbFileName, indexFile) { - glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) - generateBoltDbFile(dbFileName, indexFile) - glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) - } - glog.V(1).Infof("Opening %s...", dbFileName) - if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil { - return - } - glog.V(1).Infof("Loading %s...", indexFile.Name()) - nm, indexLoadError := LoadNeedleMap(indexFile) - if indexLoadError != nil { - return nil, indexLoadError - } - m.mapMetric = nm.mapMetric - return -} - -func isBoltDbFresh(dbFileName string, indexFile *os.File) bool { - // normally we always write to index file first - dbLogFile, err := os.Open(dbFileName) - if err != nil { - return false - } - defer dbLogFile.Close() - dbStat, dbStatErr := dbLogFile.Stat() - indexStat, indexStatErr := indexFile.Stat() - if dbStatErr != nil || indexStatErr != nil { - glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) - return false - } - - return dbStat.ModTime().After(indexStat.ModTime()) -} - -func generateBoltDbFile(dbFileName string, indexFile *os.File) error { - db, err := bolt.Open(dbFileName, 0644, nil) - if err != nil { - return err - } - defer db.Close() - return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { - if offset > 0 { - boltDbWrite(db, key, offset, size) - } else { - boltDbDelete(db, key) - } - return nil - }) -} - -func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - bytes := make([]byte, 8) - var data []byte - util.Uint64toBytes(bytes, key) - err := m.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket(boltdbBucket) - if bucket == nil { - return fmt.Errorf("Bucket %q not found!", boltdbBucket) - } - - data = bucket.Get(bytes) - return nil - }) - - if err != nil || len(data) != 8 { - return nil, false - } - offset := util.BytesToUint32(data[0:4]) - size := util.BytesToUint32(data[4:8]) - return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true -} - -func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { - var oldSize uint32 - if oldNeedle, ok := m.Get(key); ok { - oldSize = oldNeedle.Size - } - m.logPut(key, oldSize, size) - // write to index file first - if err := m.appendToIndexFile(key, offset, size); err != nil { - return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) - } - return boltDbWrite(m.db, key, offset, size) -} - -func boltDbWrite(db *bolt.DB, - key uint64, offset uint32, size uint32) error { - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], offset) - util.Uint32toBytes(bytes[12:16], size) - return db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) - if err != nil { - return err - } - - err = bucket.Put(bytes[0:8], bytes[8:16]) - if err != nil { - return err - } - return nil - }) -} -func boltDbDelete(db *bolt.DB, key uint64) error { - bytes := make([]byte, 8) - util.Uint64toBytes(bytes, key) - return db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) - if err != nil { - return err - } - - err = bucket.Delete(bytes) - if err != nil { - return err - } - return nil - }) -} - -func (m *BoltDbNeedleMap) Delete(key uint64) error { - if oldNeedle, ok := m.Get(key); ok { - m.logDelete(oldNeedle.Size) - } - // write to index file first - if err := m.appendToIndexFile(key, 0, 0); err != nil { - return err - } - return boltDbDelete(m.db, key) -} - -func (m *BoltDbNeedleMap) Close() { - m.db.Close() -} - -func (m *BoltDbNeedleMap) Destroy() error { - m.Close() - os.Remove(m.indexFile.Name()) - return os.Remove(m.dbFileName) -} diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go deleted file mode 100644 index 47f63e3ae..000000000 --- a/go/storage/needle_map_leveldb.go +++ /dev/null @@ -1,134 +0,0 @@ -package storage - -import ( - "fmt" - "os" - "path/filepath" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" - "github.com/syndtr/goleveldb/leveldb" -) - -type LevelDbNeedleMap struct { - dbFileName string - db *leveldb.DB - baseNeedleMapper -} - -func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { - m = &LevelDbNeedleMap{dbFileName: dbFileName} - m.indexFile = indexFile - if !isLevelDbFresh(dbFileName, indexFile) { - glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) - generateLevelDbFile(dbFileName, indexFile) - glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) - } - glog.V(1).Infof("Opening %s...", dbFileName) - if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil { - return - } - glog.V(1).Infof("Loading %s...", indexFile.Name()) - nm, indexLoadError := LoadNeedleMap(indexFile) - if indexLoadError != nil { - return nil, indexLoadError - } - m.mapMetric = nm.mapMetric - return -} - -func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { - // normally we always write to index file first - dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG")) - if err != nil { - return false - } - defer dbLogFile.Close() - dbStat, dbStatErr := dbLogFile.Stat() - indexStat, indexStatErr := indexFile.Stat() - if dbStatErr != nil || indexStatErr != nil { - glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) - return false - } - - return dbStat.ModTime().After(indexStat.ModTime()) -} - -func generateLevelDbFile(dbFileName string, indexFile *os.File) error { - db, err := leveldb.OpenFile(dbFileName, nil) - if err != nil { - return err - } - defer db.Close() - return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { - if offset > 0 { - levelDbWrite(db, key, offset, size) - } else { - levelDbDelete(db, key) - } - return nil - }) -} - -func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - bytes := make([]byte, 8) - util.Uint64toBytes(bytes, key) - data, err := m.db.Get(bytes, nil) - if err != nil || len(data) != 8 { - return nil, false - } - offset := util.BytesToUint32(data[0:4]) - size := util.BytesToUint32(data[4:8]) - return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true -} - -func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { - var oldSize uint32 - if oldNeedle, ok := m.Get(key); ok { - oldSize = oldNeedle.Size - } - m.logPut(key, oldSize, size) - // write to index file first - if err := m.appendToIndexFile(key, offset, size); err != nil { - return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) - } - return levelDbWrite(m.db, key, offset, size) -} - -func levelDbWrite(db *leveldb.DB, - key uint64, offset uint32, size uint32) error { - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], offset) - util.Uint32toBytes(bytes[12:16], size) - if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil { - return fmt.Errorf("failed to write leveldb: %v", err) - } - return nil -} -func levelDbDelete(db *leveldb.DB, key uint64) error { - bytes := make([]byte, 8) - util.Uint64toBytes(bytes, key) - return db.Delete(bytes, nil) -} - -func (m *LevelDbNeedleMap) Delete(key uint64) error { - if oldNeedle, ok := m.Get(key); ok { - m.logDelete(oldNeedle.Size) - } - // write to index file first - if err := m.appendToIndexFile(key, 0, 0); err != nil { - return err - } - return levelDbDelete(m.db, key) -} - -func (m *LevelDbNeedleMap) Close() { - m.db.Close() -} - -func (m *LevelDbNeedleMap) Destroy() error { - m.Close() - os.Remove(m.indexFile.Name()) - return os.Remove(m.dbFileName) -} diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go deleted file mode 100644 index 2b1fc1b54..000000000 --- a/go/storage/needle_map_memory.go +++ /dev/null @@ -1,106 +0,0 @@ -package storage - -import ( - "io" - "os" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -type NeedleMap struct { - m CompactMap - - baseNeedleMapper -} - -func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: NewCompactMap(), - } - nm.indexFile = file - return nm -} - -const ( - RowsToRead = 1024 -) - -func LoadNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewNeedleMap(file) - e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - return nil - }) - glog.V(1).Infoln("max file key:", nm.MaximumFileKey) - return nm, e -} - -// walks through the index file, calls fn function with each key, offset, size -// stops with the error returned by the fn function -func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { - var readerOffset int64 - bytes := make([]byte, 16*RowsToRead) - count, e := r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - var ( - key uint64 - offset, size uint32 - i int - ) - - for count > 0 && e == nil || e == io.EOF { - for i = 0; i+16 <= count; i += 16 { - key, offset, size = idxFileEntry(bytes[i : i+16]) - if e = fn(key, offset, size); e != nil { - return e - } - } - if e == io.EOF { - return nil - } - count, e = r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - } - return e -} - -func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { - oldSize := nm.m.Set(Key(key), offset, size) - nm.logPut(key, oldSize, size) - return nm.appendToIndexFile(key, offset, size) -} -func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) - return -} -func (nm *NeedleMap) Delete(key uint64) error { - deletedBytes := nm.m.Delete(Key(key)) - nm.logDelete(deletedBytes) - return nm.appendToIndexFile(key, 0, 0) -} -func (nm *NeedleMap) Close() { - _ = nm.indexFile.Close() -} -func (nm *NeedleMap) Destroy() error { - nm.Close() - return os.Remove(nm.indexFile.Name()) -} diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go deleted file mode 100644 index fcca2469c..000000000 --- a/go/storage/needle_read_write.go +++ /dev/null @@ -1,291 +0,0 @@ -package storage - -import ( - "errors" - "fmt" - "io" - "os" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -const ( - FlagGzip = 0x01 - FlagHasName = 0x02 - FlagHasMime = 0x04 - FlagHasLastModifiedDate = 0x08 - FlagHasTtl = 0x10 - FlagIsChunkManifest = 0x80 - LastModifiedBytesLength = 5 - TtlBytesLength = 2 -) - -func (n *Needle) DiskSize() int64 { - padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize) - return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize -} -func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { - if s, ok := w.(io.Seeker); ok { - if end, e := s.Seek(0, 1); e == nil { - defer func(s io.Seeker, off int64) { - if err != nil { - if _, e = s.Seek(off, 0); e != nil { - glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e) - } - } - }(s, end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) - return - } - } - switch version { - case Version1: - header := make([]byte, NeedleHeaderSize) - util.Uint32toBytes(header[0:4], n.Cookie) - util.Uint64toBytes(header[4:12], n.Id) - n.Size = uint32(len(n.Data)) - size = n.Size - util.Uint32toBytes(header[12:16], n.Size) - if _, err = w.Write(header); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } - padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) - util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - return - case Version2: - header := make([]byte, NeedleHeaderSize) - util.Uint32toBytes(header[0:4], n.Cookie) - util.Uint64toBytes(header[4:12], n.Id) - n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) - if n.DataSize > 0 { - n.Size = 4 + n.DataSize + 1 - if n.HasName() { - n.Size = n.Size + 1 + uint32(n.NameSize) - } - if n.HasMime() { - n.Size = n.Size + 1 + uint32(n.MimeSize) - } - if n.HasLastModifiedDate() { - n.Size = n.Size + LastModifiedBytesLength - } - if n.HasTtl() { - n.Size = n.Size + TtlBytesLength - } - } else { - n.Size = 0 - } - size = n.DataSize - util.Uint32toBytes(header[12:16], n.Size) - if _, err = w.Write(header); err != nil { - return - } - if n.DataSize > 0 { - util.Uint32toBytes(header[0:4], n.DataSize) - if _, err = w.Write(header[0:4]); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } - util.Uint8toBytes(header[0:1], n.Flags) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if n.HasName() { - util.Uint8toBytes(header[0:1], n.NameSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Name); err != nil { - return - } - } - if n.HasMime() { - util.Uint8toBytes(header[0:1], n.MimeSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Mime); err != nil { - return - } - } - if n.HasLastModifiedDate() { - util.Uint64toBytes(header[0:8], n.LastModified) - if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil { - return - } - } - if n.HasTtl() && n.Ttl != nil { - n.Ttl.ToBytes(header[0:TtlBytesLength]) - if _, err = w.Write(header[0:TtlBytesLength]); err != nil { - return - } - } - } - padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) - util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - return n.DataSize, err - } - return 0, fmt.Errorf("Unsupported Version! (%d)", version) -} - -func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { - padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) - readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding - return getBytesForFileBlock(r, offset, int(readSize)) -} - -func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { - bytes, block, err := ReadNeedleBlob(r, offset, size) - if err != nil { - return err - } - n.rawBlock = block - n.ParseNeedleHeader(bytes) - if n.Size != size { - return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) - } - switch version { - case Version1: - n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] - case Version2: - n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) - } - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) - newChecksum := NewCRC(n.Data) - if checksum != newChecksum.Value() { - return errors.New("CRC error! Data On Disk Corrupted") - } - n.Checksum = newChecksum - return nil -} -func (n *Needle) ParseNeedleHeader(bytes []byte) { - n.Cookie = util.BytesToUint32(bytes[0:4]) - n.Id = util.BytesToUint64(bytes[4:12]) - n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) -} -func (n *Needle) readNeedleDataVersion2(bytes []byte) { - index, lenBytes := 0, len(bytes) - if index < lenBytes { - n.DataSize = util.BytesToUint32(bytes[index : index+4]) - index = index + 4 - if int(n.DataSize)+index > lenBytes { - // this if clause is due to bug #87 and #93, fixed in v0.69 - // remove this clause later - return - } - n.Data = bytes[index : index+int(n.DataSize)] - index = index + int(n.DataSize) - n.Flags = bytes[index] - index = index + 1 - } - if index < lenBytes && n.HasName() { - n.NameSize = uint8(bytes[index]) - index = index + 1 - n.Name = bytes[index : index+int(n.NameSize)] - index = index + int(n.NameSize) - } - if index < lenBytes && n.HasMime() { - n.MimeSize = uint8(bytes[index]) - index = index + 1 - n.Mime = bytes[index : index+int(n.MimeSize)] - index = index + int(n.MimeSize) - } - if index < lenBytes && n.HasLastModifiedDate() { - n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) - index = index + LastModifiedBytesLength - } - if index < lenBytes && n.HasTtl() { - n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) - index = index + TtlBytesLength - } -} - -func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { - n = new(Needle) - if version == Version1 || version == Version2 { - bytes := make([]byte, NeedleHeaderSize) - var count int - count, err = r.ReadAt(bytes, offset) - if count <= 0 || err != nil { - return nil, 0, err - } - n.ParseNeedleHeader(bytes) - padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) - bodyLength = n.Size + NeedleChecksumSize + padding - } - return -} - -//n should be a needle already read the header -//the input stream will read until next file entry -func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) { - if bodyLength <= 0 { - return nil - } - switch version { - case Version1: - bytes := make([]byte, bodyLength) - if _, err = r.ReadAt(bytes, offset); err != nil { - return - } - n.Data = bytes[:n.Size] - n.Checksum = NewCRC(n.Data) - case Version2: - bytes := make([]byte, bodyLength) - if _, err = r.ReadAt(bytes, offset); err != nil { - return - } - n.readNeedleDataVersion2(bytes[0:n.Size]) - n.Checksum = NewCRC(n.Data) - default: - err = fmt.Errorf("Unsupported Version! (%d)", version) - } - return -} - -func (n *Needle) IsGzipped() bool { - return n.Flags&FlagGzip > 0 -} -func (n *Needle) SetGzipped() { - n.Flags = n.Flags | FlagGzip -} -func (n *Needle) HasName() bool { - return n.Flags&FlagHasName > 0 -} -func (n *Needle) SetHasName() { - n.Flags = n.Flags | FlagHasName -} -func (n *Needle) HasMime() bool { - return n.Flags&FlagHasMime > 0 -} -func (n *Needle) SetHasMime() { - n.Flags = n.Flags | FlagHasMime -} -func (n *Needle) HasLastModifiedDate() bool { - return n.Flags&FlagHasLastModifiedDate > 0 -} -func (n *Needle) SetHasLastModifiedDate() { - n.Flags = n.Flags | FlagHasLastModifiedDate -} -func (n *Needle) HasTtl() bool { - return n.Flags&FlagHasTtl > 0 -} -func (n *Needle) SetHasTtl() { - n.Flags = n.Flags | FlagHasTtl -} - -func (n *Needle) IsChunkedManifest() bool { - return n.Flags&FlagIsChunkManifest > 0 -} - -func (n *Needle) SetIsChunkManifest() { - n.Flags = n.Flags | FlagIsChunkManifest -} diff --git a/go/storage/needle_test.go b/go/storage/needle_test.go deleted file mode 100644 index c05afda2f..000000000 --- a/go/storage/needle_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package storage - -import "testing" - -func TestParseKeyHash(t *testing.T) { - testcases := []struct { - KeyHash string - ID uint64 - Cookie uint32 - Err bool - }{ - // normal - {"4ed4c8116e41", 0x4ed4, 0xc8116e41, false}, - // cookie with leading zeros - {"4ed401116e41", 0x4ed4, 0x01116e41, false}, - // odd length - {"ed400116e41", 0xed4, 0x00116e41, false}, - // uint - {"fed4c8114ed4c811f0116e41", 0xfed4c8114ed4c811, 0xf0116e41, false}, - // err: too short - {"4ed4c811", 0, 0, true}, - // err: too long - {"4ed4c8114ed4c8114ed4c8111", 0, 0, true}, - // err: invalid character - {"helloworld", 0, 0, true}, - } - - for _, tc := range testcases { - if id, cookie, err := ParseKeyHash(tc.KeyHash); err != nil && !tc.Err { - t.Fatalf("Parse %s error: %v", tc.KeyHash, err) - } else if err == nil && tc.Err { - t.Fatalf("Parse %s expected error got nil", tc.KeyHash) - } else if id != tc.ID || cookie != tc.Cookie { - t.Fatalf("Parse %s wrong result. Expected: (%d, %d) got: (%d, %d)", tc.KeyHash, tc.ID, tc.Cookie, id, cookie) - } - } -} - -func BenchmarkParseKeyHash(b *testing.B) { - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - ParseKeyHash("4ed44ed44ed44ed4c8116e41") - } -} diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go deleted file mode 100644 index c1aca52eb..000000000 --- a/go/storage/replica_placement.go +++ /dev/null @@ -1,53 +0,0 @@ -package storage - -import ( - "errors" - "fmt" -) - -type ReplicaPlacement struct { - SameRackCount int - DiffRackCount int - DiffDataCenterCount int -} - -func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { - rp := &ReplicaPlacement{} - for i, c := range t { - count := int(c - '0') - if 0 <= count && count <= 2 { - switch i { - case 0: - rp.DiffDataCenterCount = count - case 1: - rp.DiffRackCount = count - case 2: - rp.SameRackCount = count - } - } else { - return rp, errors.New("Unknown Replication Type:" + t) - } - } - return rp, nil -} - -func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { - return NewReplicaPlacementFromString(fmt.Sprintf("%03d", b)) -} - -func (rp *ReplicaPlacement) Byte() byte { - ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount - return byte(ret) -} - -func (rp *ReplicaPlacement) String() string { - b := make([]byte, 3) - b[0] = byte(rp.DiffDataCenterCount + '0') - b[1] = byte(rp.DiffRackCount + '0') - b[2] = byte(rp.SameRackCount + '0') - return string(b) -} - -func (rp *ReplicaPlacement) GetCopyCount() int { - return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 -} diff --git a/go/storage/replica_placement_test.go b/go/storage/replica_placement_test.go deleted file mode 100644 index 9c2161e94..000000000 --- a/go/storage/replica_placement_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package storage - -import ( - "testing" -) - -func TestReplicaPlacemnetSerialDeserial(t *testing.T) { - rp, _ := NewReplicaPlacementFromString("001") - new_rp, _ := NewReplicaPlacementFromByte(rp.Byte()) - if rp.String() != new_rp.String() { - println("expected:", rp.String(), "actual:", new_rp.String()) - t.Fail() - } -} diff --git a/go/storage/store.go b/go/storage/store.go deleted file mode 100644 index dd312c075..000000000 --- a/go/storage/store.go +++ /dev/null @@ -1,340 +0,0 @@ -package storage - -import ( - "encoding/json" - "errors" - "fmt" - "math/rand" - "strconv" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/security" - "github.com/chrislusf/seaweedfs/go/util" - "github.com/golang/protobuf/proto" -) - -const ( - MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes -) - -type MasterNodes struct { - nodes []string - lastNode int -} - -func (mn *MasterNodes) String() string { - return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode) -} - -func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { - mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} - return -} -func (mn *MasterNodes) reset() { - glog.V(4).Infof("Resetting master nodes: %v", mn) - if len(mn.nodes) > 1 && mn.lastNode >= 0 { - glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) - mn.lastNode = -mn.lastNode - 1 - } -} -func (mn *MasterNodes) findMaster() (string, error) { - if len(mn.nodes) == 0 { - return "", errors.New("No master node found!") - } - if mn.lastNode < 0 { - for _, m := range mn.nodes { - glog.V(4).Infof("Listing masters on %s", m) - if masters, e := operation.ListMasters(m); e == nil { - if len(masters) == 0 { - continue - } - mn.nodes = append(masters, m) - mn.lastNode = rand.Intn(len(mn.nodes)) - glog.V(2).Infof("current master nodes is %v", mn) - break - } else { - glog.V(4).Infof("Failed listing masters on %s: %v", m, e) - } - } - } - if mn.lastNode < 0 { - return "", errors.New("No master node available!") - } - return mn.nodes[mn.lastNode], nil -} - -/* - * A VolumeServer contains one Store - */ -type Store struct { - Ip string - Port int - PublicUrl string - Locations []*DiskLocation - dataCenter string //optional informaton, overwriting master setting if exists - rack string //optional information, overwriting master setting if exists - connected bool - volumeSizeLimit uint64 //read from the master - masterNodes *MasterNodes -} - -func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes) - return -} - -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} - s.Locations = make([]*DiskLocation, 0) - for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) - location.loadExistingVolumes(needleMapKind) - s.Locations = append(s.Locations, location) - } - return -} -func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { - rt, e := NewReplicaPlacementFromString(replicaPlacement) - if e != nil { - return e - } - ttl, e := ReadTTL(ttlString) - if e != nil { - return e - } - for _, range_string := range strings.Split(volumeListString, ",") { - if strings.Index(range_string, "-") < 0 { - id_string := range_string - id, err := NewVolumeId(id_string) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) - } - e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) - } else { - pair := strings.Split(range_string, "-") - start, start_err := strconv.ParseUint(pair[0], 10, 64) - if start_err != nil { - return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0]) - } - end, end_err := strconv.ParseUint(pair[1], 10, 64) - if end_err != nil { - return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) - } - for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { - e = err - } - } - } - } - return e -} -func (s *Store) DeleteCollection(collection string) (e error) { - for _, location := range s.Locations { - e = location.DeleteCollectionFromDiskLocation(collection) - if e != nil { - return - } - } - return -} - -func (s *Store) findVolume(vid VolumeId) *Volume { - for _, location := range s.Locations { - if v, found := location.volumes[vid]; found { - return v - } - } - return nil -} -func (s *Store) findFreeLocation() (ret *DiskLocation) { - max := 0 - for _, location := range s.Locations { - currentFreeCount := location.MaxVolumeCount - len(location.volumes) - if currentFreeCount > max { - max = currentFreeCount - ret = location - } - } - return ret -} -func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { - if s.findVolume(vid) != nil { - return fmt.Errorf("Volume Id %d already exists!", vid) - } - if location := s.findFreeLocation(); location != nil { - glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", - location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { - location.volumes[vid] = volume - return nil - } else { - return err - } - } - return fmt.Errorf("No more free space left") -} - -func (s *Store) Status() []*VolumeInfo { - var stats []*VolumeInfo - for _, location := range s.Locations { - for k, v := range location.volumes { - s := &VolumeInfo{ - Id: VolumeId(k), - Size: v.ContentSize(), - Collection: v.Collection, - ReplicaPlacement: v.ReplicaPlacement, - Version: v.Version(), - FileCount: v.nm.FileCount(), - DeleteCount: v.nm.DeletedCount(), - DeletedByteCount: v.nm.DeletedSize(), - ReadOnly: v.readOnly, - Ttl: v.Ttl} - stats = append(stats, s) - } - } - sortVolumeInfos(stats) - return stats -} - -func (s *Store) SetDataCenter(dataCenter string) { - s.dataCenter = dataCenter -} -func (s *Store) SetRack(rack string) { - s.rack = rack -} - -func (s *Store) SetBootstrapMaster(bootstrapMaster string) { - s.masterNodes = NewMasterNodes(bootstrapMaster) -} -func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { - masterNode, e = s.masterNodes.findMaster() - if e != nil { - return - } - var volumeMessages []*operation.VolumeInformationMessage - maxVolumeCount := 0 - var maxFileKey uint64 - for _, location := range s.Locations { - maxVolumeCount = maxVolumeCount + location.MaxVolumeCount - for k, v := range location.volumes { - if maxFileKey < v.nm.MaxFileKey() { - maxFileKey = v.nm.MaxFileKey() - } - if !v.expired(s.volumeSizeLimit) { - volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(k)), - Size: proto.Uint64(uint64(v.Size())), - Collection: proto.String(v.Collection), - FileCount: proto.Uint64(uint64(v.nm.FileCount())), - DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), - DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), - Version: proto.Uint32(uint32(v.Version())), - Ttl: proto.Uint32(v.Ttl.ToUint32()), - } - volumeMessages = append(volumeMessages, volumeMessage) - } else { - if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { - location.deleteVolumeById(v.Id) - glog.V(0).Infoln("volume", v.Id, "is deleted.") - } else { - glog.V(0).Infoln("volume", v.Id, "is expired.") - } - } - } - } - - joinMessage := &operation.JoinMessage{ - IsInit: proto.Bool(!s.connected), - Ip: proto.String(s.Ip), - Port: proto.Uint32(uint32(s.Port)), - PublicUrl: proto.String(s.PublicUrl), - MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)), - MaxFileKey: proto.Uint64(maxFileKey), - DataCenter: proto.String(s.dataCenter), - Rack: proto.String(s.rack), - Volumes: volumeMessages, - } - - data, err := proto.Marshal(joinMessage) - if err != nil { - return "", "", err - } - - joinUrl := "http://" + masterNode + "/dir/join" - glog.V(4).Infof("Connecting to %s ...", joinUrl) - - jsonBlob, err := util.PostBytes(joinUrl, data) - if err != nil { - s.masterNodes.reset() - return "", "", err - } - var ret operation.JoinResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) - s.masterNodes.reset() - return masterNode, "", err - } - if ret.Error != "" { - s.masterNodes.reset() - return masterNode, "", errors.New(ret.Error) - } - s.volumeSizeLimit = ret.VolumeSizeLimit - secretKey = security.Secret(ret.SecretKey) - s.connected = true - return -} -func (s *Store) Close() { - for _, location := range s.Locations { - for _, v := range location.volumes { - v.Close() - } - } -} -func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { - if v := s.findVolume(i); v != nil { - if v.readOnly { - err = fmt.Errorf("Volume %d is read only", i) - return - } - if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { - size, err = v.write(n) - } else { - err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize()) - } - if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { - glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) - if _, _, e := s.SendHeartbeatToMaster(); e != nil { - glog.V(0).Infoln("error when reporting size:", e) - } - } - return - } - glog.V(0).Infoln("volume", i, "not found!") - err = fmt.Errorf("Volume %d not found!", i) - return -} -func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { - if v := s.findVolume(i); v != nil && !v.readOnly { - return v.delete(n) - } - return 0, nil -} -func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { - if v := s.findVolume(i); v != nil { - return v.readNeedle(n) - } - return 0, fmt.Errorf("Volume %v not found!", i) -} -func (s *Store) GetVolume(i VolumeId) *Volume { - return s.findVolume(i) -} - -func (s *Store) HasVolume(i VolumeId) bool { - v := s.findVolume(i) - return v != nil -} diff --git a/go/storage/store_vacuum.go b/go/storage/store_vacuum.go deleted file mode 100644 index 52343c898..000000000 --- a/go/storage/store_vacuum.go +++ /dev/null @@ -1,44 +0,0 @@ -package storage - -import ( - "fmt" - "strconv" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString), false - } - garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) - if e != nil { - return fmt.Errorf("garbageThreshold %s is not a valid float number", garbageThresholdString), false - } - if v := s.findVolume(vid); v != nil { - glog.V(3).Infoln(vid, "garbage level is", v.garbageLevel()) - return nil, garbageThreshold < v.garbageLevel() - } - return fmt.Errorf("volume id %d is not found during check compact", vid), false -} -func (s *Store) CompactVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - } - if v := s.findVolume(vid); v != nil { - return v.Compact() - } - return fmt.Errorf("volume id %d is not found during compact", vid) -} -func (s *Store) CommitCompactVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - } - if v := s.findVolume(vid); v != nil { - return v.commitCompact() - } - return fmt.Errorf("volume id %d is not found during commit compact", vid) -} diff --git a/go/storage/volume.go b/go/storage/volume.go deleted file mode 100644 index 7e330a9e4..000000000 --- a/go/storage/volume.go +++ /dev/null @@ -1,430 +0,0 @@ -package storage - -import ( - "bytes" - "errors" - "fmt" - "io" - "os" - "path" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -type Volume struct { - Id VolumeId - dir string - Collection string - dataFile *os.File - nm NeedleMapper - needleMapKind NeedleMapType - readOnly bool - - SuperBlock - - dataFileAccessLock sync.Mutex - lastModifiedTime uint64 //unix time in seconds -} - -func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { - v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - v.needleMapKind = needleMapKind - e = v.load(true, true, needleMapKind) - return -} -func (v *Volume) String() string { - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) -} - -func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { - v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{} - v.needleMapKind = needleMapKind - e = v.load(false, false, needleMapKind) - return -} -func (v *Volume) FileName() (fileName string) { - if v.Collection == "" { - fileName = path.Join(v.dir, v.Id.String()) - } else { - fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String()) - } - return -} -func (v *Volume) DataFile() *os.File { - return v.dataFile -} -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error { - var e error - fileName := v.FileName() - - if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists { - if !canRead { - return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) - } - if canWrite { - v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) - v.lastModifiedTime = uint64(modifiedTime.Unix()) - } else { - glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") - v.dataFile, e = os.Open(fileName + ".dat") - v.readOnly = true - } - } else { - if createDatIfMissing { - v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) - } else { - return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) - } - } - - if e != nil { - if !os.IsPermission(e) { - return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e) - } - } - - if v.ReplicaPlacement == nil { - e = v.readSuperBlock() - } else { - e = v.maybeWriteSuperBlock() - } - if e == nil && alsoLoadIndex { - var indexFile *os.File - if v.readOnly { - glog.V(1).Infoln("open to read file", fileName+".idx") - if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil { - return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e) - } - } else { - glog.V(1).Infoln("open to write file", fileName+".idx") - if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil { - return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) - } - } - switch needleMapKind { - case NeedleMapInMemory: - glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) - if v.nm, e = LoadNeedleMap(indexFile); e != nil { - glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) - } - case NeedleMapLevelDb: - glog.V(0).Infoln("loading leveldb file", fileName+".ldb") - if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) - } - case NeedleMapBoltDb: - glog.V(0).Infoln("loading boltdb file", fileName+".bdb") - if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil { - glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e) - } - } - } - return e -} -func (v *Volume) Version() Version { - return v.SuperBlock.Version() -} -func (v *Volume) Size() int64 { - stat, e := v.dataFile.Stat() - if e == nil { - return stat.Size() - } - glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) - return -1 -} - -// Close cleanly shuts down this volume -func (v *Volume) Close() { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - v.nm.Close() - _ = v.dataFile.Close() -} - -func (v *Volume) NeedToReplicate() bool { - return v.ReplicaPlacement.GetCopyCount() > 1 -} - -// isFileUnchanged checks whether this needle to write is same as last one. -// It requires serialized access in the same volume. -func (v *Volume) isFileUnchanged(n *Needle) bool { - if v.Ttl.String() != "" { - return false - } - nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { - oldNeedle := new(Needle) - err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) - if err != nil { - glog.V(0).Infof("Failed to check updated file %v", err) - return false - } - defer oldNeedle.ReleaseMemory() - if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { - n.DataSize = oldNeedle.DataSize - return true - } - } - return false -} - -// Destroy removes everything related to this volume -func (v *Volume) Destroy() (err error) { - if v.readOnly { - err = fmt.Errorf("%s is read-only", v.dataFile.Name()) - return - } - v.Close() - err = os.Remove(v.dataFile.Name()) - if err != nil { - return - } - err = v.nm.Destroy() - return -} - -// AppendBlob append a blob to end of the data file, used in replication -func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { - if v.readOnly { - err = fmt.Errorf("%s is read-only", v.dataFile.Name()) - return - } - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - if offset, err = v.dataFile.Seek(0, 2); err != nil { - glog.V(0).Infof("failed to seek the end of file: %v", err) - return - } - //ensure file writing starting from aligned positions - if offset%NeedlePaddingSize != 0 { - offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) - if offset, err = v.dataFile.Seek(offset, 0); err != nil { - glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) - return - } - } - v.dataFile.Write(b) - return -} - -func (v *Volume) write(n *Needle) (size uint32, err error) { - glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String()) - if v.readOnly { - err = fmt.Errorf("%s is read-only", v.dataFile.Name()) - return - } - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - if v.isFileUnchanged(n) { - size = n.DataSize - glog.V(4).Infof("needle is unchanged!") - return - } - var offset int64 - if offset, err = v.dataFile.Seek(0, 2); err != nil { - glog.V(0).Infof("failed to seek the end of file: %v", err) - return - } - - //ensure file writing starting from aligned positions - if offset%NeedlePaddingSize != 0 { - offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) - if offset, err = v.dataFile.Seek(offset, 0); err != nil { - glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) - return - } - } - - if size, err = n.Append(v.dataFile, v.Version()); err != nil { - if e := v.dataFile.Truncate(offset); e != nil { - err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e) - } - return - } - nv, ok := v.nm.Get(n.Id) - if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) - } - } - if v.lastModifiedTime < n.LastModified { - v.lastModifiedTime = n.LastModified - } - return -} - -func (v *Volume) delete(n *Needle) (uint32, error) { - glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String()) - if v.readOnly { - return 0, fmt.Errorf("%s is read-only", v.dataFile.Name()) - } - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - nv, ok := v.nm.Get(n.Id) - //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) - if ok { - size := nv.Size - if err := v.nm.Delete(n.Id); err != nil { - return size, err - } - if _, err := v.dataFile.Seek(0, 2); err != nil { - return size, err - } - n.Data = nil - _, err := n.Append(v.dataFile, v.Version()) - return size, err - } - return 0, nil -} - -// read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *Needle) (int, error) { - nv, ok := v.nm.Get(n.Id) - if !ok || nv.Offset == 0 { - return -1, errors.New("Not Found") - } - err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) - if err != nil { - return 0, err - } - bytesRead := len(n.Data) - if !n.HasTtl() { - return bytesRead, nil - } - ttlMinutes := n.Ttl.Minutes() - if ttlMinutes == 0 { - return bytesRead, nil - } - if !n.HasLastModifiedDate() { - return bytesRead, nil - } - if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { - return bytesRead, nil - } - n.ReleaseMemory() - return -1, errors.New("Not Found") -} - -func ScanVolumeFile(dirname string, collection string, id VolumeId, - needleMapKind NeedleMapType, - visitSuperBlock func(SuperBlock) error, - readNeedleBody bool, - visitNeedle func(n *Needle, offset int64) error) (err error) { - var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { - return fmt.Errorf("Failed to load volume %d: %v", id, err) - } - if err = visitSuperBlock(v.SuperBlock); err != nil { - return fmt.Errorf("Failed to process volume %d super block: %v", id, err) - } - - version := v.Version() - - offset := int64(SuperBlockSize) - n, rest, e := ReadNeedleHeader(v.dataFile, version, offset) - if e != nil { - err = fmt.Errorf("cannot read needle header: %v", e) - return - } - for n != nil { - if readNeedleBody { - if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil { - glog.V(0).Infof("cannot read needle body: %v", err) - //err = fmt.Errorf("cannot read needle body: %v", err) - //return - } - if n.DataSize >= n.Size { - // this should come from a bug reported on #87 and #93 - // fixed in v0.69 - // remove this whole "if" clause later, long after 0.69 - oldRest, oldSize := rest, n.Size - padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) - n.Size = 0 - rest = n.Size + NeedleChecksumSize + padding - if rest%NeedlePaddingSize != 0 { - rest += (NeedlePaddingSize - rest%NeedlePaddingSize) - } - glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n) - } - } - if err = visitNeedle(n, offset); err != nil { - glog.V(0).Infof("visit needle error: %v", err) - } - offset += int64(NeedleHeaderSize) + int64(rest) - glog.V(4).Infof("==> new entry offset %d", offset) - if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("cannot read needle header: %v", err) - } - glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) - } - - return -} - -func (v *Volume) ContentSize() uint64 { - return v.nm.ContentSize() -} - -func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) { - exists = true - fi, err := os.Stat(filename) - if os.IsNotExist(err) { - exists = false - return - } - if fi.Mode()&0400 != 0 { - canRead = true - } - if fi.Mode()&0200 != 0 { - canWrite = true - } - modTime = fi.ModTime() - return -} - -// volume is expired if modified time + volume ttl < now -// except when volume is empty -// or when the volume does not have a ttl -// or when volumeSizeLimit is 0 when server just starts -func (v *Volume) expired(volumeSizeLimit uint64) bool { - if volumeSizeLimit == 0 { - //skip if we don't know size limit - return false - } - if v.ContentSize() == 0 { - return false - } - if v.Ttl == nil || v.Ttl.Minutes() == 0 { - return false - } - glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) - livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 - glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) - if int64(v.Ttl.Minutes()) < livedMinutes { - return true - } - return false -} - -// wait either maxDelayMinutes or 10% of ttl minutes -func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { - if v.Ttl == nil || v.Ttl.Minutes() == 0 { - return false - } - removalDelay := v.Ttl.Minutes() / 10 - if removalDelay > maxDelayMinutes { - removalDelay = maxDelayMinutes - } - - if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { - return true - } - return false -} diff --git a/go/storage/volume_id.go b/go/storage/volume_id.go deleted file mode 100644 index 0333c6cf0..000000000 --- a/go/storage/volume_id.go +++ /dev/null @@ -1,18 +0,0 @@ -package storage - -import ( - "strconv" -) - -type VolumeId uint32 - -func NewVolumeId(vid string) (VolumeId, error) { - volumeId, err := strconv.ParseUint(vid, 10, 64) - return VolumeId(volumeId), err -} -func (vid *VolumeId) String() string { - return strconv.FormatUint(uint64(*vid), 10) -} -func (vid *VolumeId) Next() VolumeId { - return VolumeId(uint32(*vid) + 1) -} diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go deleted file mode 100644 index a2f139c89..000000000 --- a/go/storage/volume_info.go +++ /dev/null @@ -1,65 +0,0 @@ -package storage - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/go/operation" - "sort" -) - -type VolumeInfo struct { - Id VolumeId - Size uint64 - ReplicaPlacement *ReplicaPlacement - Ttl *TTL - Collection string - Version Version - FileCount int - DeleteCount int - DeletedByteCount uint64 - ReadOnly bool -} - -func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) { - vi = VolumeInfo{ - Id: VolumeId(*m.Id), - Size: *m.Size, - Collection: *m.Collection, - FileCount: int(*m.FileCount), - DeleteCount: int(*m.DeleteCount), - DeletedByteCount: *m.DeletedByteCount, - ReadOnly: *m.ReadOnly, - Version: Version(*m.Version), - } - rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement)) - if e != nil { - return vi, e - } - vi.ReplicaPlacement = rp - vi.Ttl = LoadTTLFromUint32(*m.Ttl) - return vi, nil -} - -func (vi VolumeInfo) String() string { - return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", - vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) -} - -/*VolumesInfo sorting*/ - -type volumeInfos []*VolumeInfo - -func (vis volumeInfos) Len() int { - return len(vis) -} - -func (vis volumeInfos) Less(i, j int) bool { - return vis[i].Id < vis[j].Id -} - -func (vis volumeInfos) Swap(i, j int) { - vis[i], vis[j] = vis[j], vis[i] -} - -func sortVolumeInfos(vis volumeInfos) { - sort.Sort(vis) -} diff --git a/go/storage/volume_info_test.go b/go/storage/volume_info_test.go deleted file mode 100644 index 9a9c43ad2..000000000 --- a/go/storage/volume_info_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package storage - -import "testing" - -func TestSortVolumeInfos(t *testing.T) { - vis := []*VolumeInfo{ - &VolumeInfo{ - Id: 2, - }, - &VolumeInfo{ - Id: 1, - }, - &VolumeInfo{ - Id: 3, - }, - } - sortVolumeInfos(vis) - for i := 0; i < len(vis); i++ { - if vis[i].Id != VolumeId(i+1) { - t.Fatal() - } - } -} diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go deleted file mode 100644 index e37360075..000000000 --- a/go/storage/volume_super_block.go +++ /dev/null @@ -1,81 +0,0 @@ -package storage - -import ( - "fmt" - "os" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" -) - -const ( - SuperBlockSize = 8 -) - -/* -* Super block currently has 8 bytes allocated for each volume. -* Byte 0: version, 1 or 2 -* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc -* Byte 2 and byte 3: Time to live. See TTL for definition -* Byte 4 and byte 5: The number of times the volume has been compacted. -* Rest bytes: Reserved - */ -type SuperBlock struct { - version Version - ReplicaPlacement *ReplicaPlacement - Ttl *TTL - CompactRevision uint16 -} - -func (s *SuperBlock) Version() Version { - return s.version -} -func (s *SuperBlock) Bytes() []byte { - header := make([]byte, SuperBlockSize) - header[0] = byte(s.version) - header[1] = s.ReplicaPlacement.Byte() - s.Ttl.ToBytes(header[2:4]) - util.Uint16toBytes(header[4:6], s.CompactRevision) - return header -} - -func (v *Volume) maybeWriteSuperBlock() error { - stat, e := v.dataFile.Stat() - if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile, e) - return e - } - if stat.Size() == 0 { - v.SuperBlock.version = CurrentVersion - _, e = v.dataFile.Write(v.SuperBlock.Bytes()) - if e != nil && os.IsPermission(e) { - //read-only, but zero length - recreate it! - if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { - if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { - v.readOnly = false - } - } - } - } - return e -} -func (v *Volume) readSuperBlock() (err error) { - if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err) - } - header := make([]byte, SuperBlockSize) - if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e) - } - v.SuperBlock, err = ParseSuperBlock(header) - return err -} -func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { - superBlock.version = Version(header[0]) - if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err.Error()) - } - superBlock.Ttl = LoadTTLFromBytes(header[2:4]) - superBlock.CompactRevision = util.BytesToUint16(header[4:6]) - return -} diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go deleted file mode 100644 index 13db4b194..000000000 --- a/go/storage/volume_super_block_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package storage - -import ( - "testing" -) - -func TestSuperBlockReadWrite(t *testing.T) { - rp, _ := NewReplicaPlacementFromByte(byte(001)) - ttl, _ := ReadTTL("15d") - s := &SuperBlock{ - version: CurrentVersion, - ReplicaPlacement: rp, - Ttl: ttl, - } - - bytes := s.Bytes() - - if !(bytes[2] == 15 && bytes[3] == Day) { - println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) - t.Fail() - } - -} diff --git a/go/storage/volume_sync.go b/go/storage/volume_sync.go deleted file mode 100644 index 2c72d62f0..000000000 --- a/go/storage/volume_sync.go +++ /dev/null @@ -1,213 +0,0 @@ -package storage - -import ( - "fmt" - "io" - "io/ioutil" - "net/url" - "os" - "sort" - "strconv" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/util" -) - -// The volume sync with a master volume via 2 steps: -// 1. The slave checks master side to find subscription checkpoint -// to setup the replication. -// 2. The slave receives the updates from master - -/* -Assume the slave volume needs to follow the master volume. - -The master volume could be compacted, and could be many files ahead of -slave volume. - -Step 1: -The slave volume will ask the master volume for a snapshot -of (existing file entries, last offset, number of compacted times). - -For each entry x in master existing file entries: - if x does not exist locally: - add x locally - -For each entry y in local slave existing file entries: - if y does not exist on master: - delete y locally - -Step 2: -After this, use the last offset and number of compacted times to request -the master volume to send a new file, and keep looping. If the number of -compacted times is changed, go back to step 1 (very likely this can be -optimized more later). - -*/ - -func (v *Volume) Synchronize(volumeServer string) (err error) { - var lastCompactRevision uint16 = 0 - var compactRevision uint16 = 0 - var masterMap CompactMap - for i := 0; i < 3; i++ { - if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { - return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) - } - if lastCompactRevision != compactRevision && lastCompactRevision != 0 { - if err = v.Compact(); err != nil { - return fmt.Errorf("Compact Volume before synchronizing %v", err) - } - if err = v.commitCompact(); err != nil { - return fmt.Errorf("Commit Compact before synchronizing %v", err) - } - } - lastCompactRevision = compactRevision - if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil { - return - } - } - return -} - -type ByOffset []NeedleValue - -func (a ByOffset) Len() int { return len(a) } -func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } - -// trySynchronizing sync with remote volume server incrementally by -// make up the local and remote delta. -func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error { - slaveIdxFile, err := os.Open(v.nm.IndexFileName()) - if err != nil { - return fmt.Errorf("Open volume %d index file: %v", v.Id, err) - } - defer slaveIdxFile.Close() - slaveMap, err := LoadNeedleMap(slaveIdxFile) - if err != nil { - return fmt.Errorf("Load volume %d index file: %v", v.Id, err) - } - var delta []NeedleValue - if err := masterMap.Visit(func(needleValue NeedleValue) error { - if needleValue.Key == 0 { - return nil - } - if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok { - return nil // skip intersection - } - delta = append(delta, needleValue) - return nil - }); err != nil { - return fmt.Errorf("Add master entry: %v", err) - } - if err := slaveMap.m.Visit(func(needleValue NeedleValue) error { - if needleValue.Key == 0 { - return nil - } - if _, ok := masterMap.Get(needleValue.Key); ok { - return nil // skip intersection - } - needleValue.Size = 0 - delta = append(delta, needleValue) - return nil - }); err != nil { - return fmt.Errorf("Remove local entry: %v", err) - } - - // simulate to same ordering of remote .dat file needle entries - sort.Sort(ByOffset(delta)) - - // make up the delta - fetchCount := 0 - volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data" - for _, needleValue := range delta { - if needleValue.Size == 0 { - // remove file entry from local - v.removeNeedle(needleValue.Key) - continue - } - // add master file entry to local data file - if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil { - glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err) - return err - } - fetchCount++ - } - glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer) - return nil -} - -func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) { - m = NewCompactMap() - - syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) - if err != nil { - return m, 0, 0, err - } - - total := 0 - err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { - // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) - if offset != 0 && size != 0 { - m.Set(Key(key), offset, size) - } else { - m.Delete(Key(key)) - } - total++ - }) - - glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision) - return m, syncStatus.TailOffset, syncStatus.CompactRevision, err - -} - -func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse { - var syncStatus = operation.SyncVolumeResponse{} - if stat, err := v.dataFile.Stat(); err == nil { - syncStatus.TailOffset = uint64(stat.Size()) - } - syncStatus.IdxFileSize = v.nm.IndexFileSize() - syncStatus.CompactRevision = v.SuperBlock.CompactRevision - syncStatus.Ttl = v.SuperBlock.Ttl.String() - syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() - return syncStatus -} - -func (v *Volume) IndexFileContent() ([]byte, error) { - return v.nm.IndexFileContent() -} - -// removeNeedle removes one needle by needle key -func (v *Volume) removeNeedle(key Key) { - n := new(Needle) - n.Id = uint64(key) - v.delete(n) -} - -// fetchNeedle fetches a remote volume needle by vid, id, offset -// The compact revision is checked first in case the remote volume -// is compacted and the offset is invalid any more. -func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, - needleValue NeedleValue, compactRevision uint16) error { - // add master file entry to local data file - values := make(url.Values) - values.Add("revision", strconv.Itoa(int(compactRevision))) - values.Add("volume", v.Id.String()) - values.Add("id", needleValue.Key.String()) - values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10)) - values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10)) - glog.V(4).Infof("Fetch %+v", needleValue) - return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error { - b, err := ioutil.ReadAll(r) - if err != nil { - return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) - } - offset, err := v.AppendBlob(b) - if err != nil { - return fmt.Errorf("Appending volume %d error: %v", v.Id, err) - } - // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size) - v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size) - return nil - }) -} diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go deleted file mode 100644 index 4318bb048..000000000 --- a/go/storage/volume_ttl.go +++ /dev/null @@ -1,135 +0,0 @@ -package storage - -import ( - "strconv" -) - -const ( - //stored unit types - Empty byte = iota - Minute - Hour - Day - Week - Month - Year -) - -type TTL struct { - count byte - unit byte -} - -var EMPTY_TTL = &TTL{} - -// translate a readable ttl to internal ttl -// Supports format example: -// 3m: 3 minutes -// 4h: 4 hours -// 5d: 5 days -// 6w: 6 weeks -// 7M: 7 months -// 8y: 8 years -func ReadTTL(ttlString string) (*TTL, error) { - if ttlString == "" { - return EMPTY_TTL, nil - } - ttlBytes := []byte(ttlString) - unitByte := ttlBytes[len(ttlBytes)-1] - countBytes := ttlBytes[0 : len(ttlBytes)-1] - if '0' <= unitByte && unitByte <= '9' { - countBytes = ttlBytes - unitByte = 'm' - } - count, err := strconv.Atoi(string(countBytes)) - unit := toStoredByte(unitByte) - return &TTL{count: byte(count), unit: unit}, err -} - -// read stored bytes to a ttl -func LoadTTLFromBytes(input []byte) (t *TTL) { - return &TTL{count: input[0], unit: input[1]} -} - -// read stored bytes to a ttl -func LoadTTLFromUint32(ttl uint32) (t *TTL) { - input := make([]byte, 2) - input[1] = byte(ttl) - input[0] = byte(ttl >> 8) - return LoadTTLFromBytes(input) -} - -// save stored bytes to an output with 2 bytes -func (t *TTL) ToBytes(output []byte) { - output[0] = t.count - output[1] = t.unit -} - -func (t *TTL) ToUint32() (output uint32) { - output = uint32(t.count) << 8 - output += uint32(t.unit) - return output -} - -func (t *TTL) String() string { - if t == nil || t.count == 0 { - return "" - } - if t.unit == Empty { - return "" - } - countString := strconv.Itoa(int(t.count)) - switch t.unit { - case Minute: - return countString + "m" - case Hour: - return countString + "h" - case Day: - return countString + "d" - case Week: - return countString + "w" - case Month: - return countString + "M" - case Year: - return countString + "y" - } - return "" -} - -func toStoredByte(readableUnitByte byte) byte { - switch readableUnitByte { - case 'm': - return Minute - case 'h': - return Hour - case 'd': - return Day - case 'w': - return Week - case 'M': - return Month - case 'y': - return Year - } - return 0 -} - -func (t TTL) Minutes() uint32 { - switch t.unit { - case Empty: - return 0 - case Minute: - return uint32(t.count) - case Hour: - return uint32(t.count) * 60 - case Day: - return uint32(t.count) * 60 * 24 - case Week: - return uint32(t.count) * 60 * 24 * 7 - case Month: - return uint32(t.count) * 60 * 24 * 31 - case Year: - return uint32(t.count) * 60 * 24 * 365 - } - return 0 -} diff --git a/go/storage/volume_ttl_test.go b/go/storage/volume_ttl_test.go deleted file mode 100644 index 216469a4c..000000000 --- a/go/storage/volume_ttl_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package storage - -import ( - "testing" -) - -func TestTTLReadWrite(t *testing.T) { - ttl, _ := ReadTTL("") - if ttl.Minutes() != 0 { - t.Errorf("empty ttl:%v", ttl) - } - - ttl, _ = ReadTTL("9") - if ttl.Minutes() != 9 { - t.Errorf("9 ttl:%v", ttl) - } - - ttl, _ = ReadTTL("8m") - if ttl.Minutes() != 8 { - t.Errorf("8m ttl:%v", ttl) - } - - ttl, _ = ReadTTL("5h") - if ttl.Minutes() != 300 { - t.Errorf("5h ttl:%v", ttl) - } - - ttl, _ = ReadTTL("5d") - if ttl.Minutes() != 5*24*60 { - t.Errorf("5d ttl:%v", ttl) - } - - ttl, _ = ReadTTL("5w") - if ttl.Minutes() != 5*7*24*60 { - t.Errorf("5w ttl:%v", ttl) - } - - ttl, _ = ReadTTL("5M") - if ttl.Minutes() != 5*31*24*60 { - t.Errorf("5M ttl:%v", ttl) - } - - ttl, _ = ReadTTL("5y") - if ttl.Minutes() != 5*365*24*60 { - t.Errorf("5y ttl:%v", ttl) - } - - output := make([]byte, 2) - ttl.ToBytes(output) - ttl2 := LoadTTLFromBytes(output) - if ttl.Minutes() != ttl2.Minutes() { - t.Errorf("ttl:%v ttl2:%v", ttl, ttl2) - } - - ttl3 := LoadTTLFromUint32(ttl.ToUint32()) - if ttl.Minutes() != ttl3.Minutes() { - t.Errorf("ttl:%v ttl3:%v", ttl, ttl3) - } - -} diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go deleted file mode 100644 index 5ba8d575c..000000000 --- a/go/storage/volume_vacuum.go +++ /dev/null @@ -1,93 +0,0 @@ -package storage - -import ( - "fmt" - "os" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -func (v *Volume) garbageLevel() float64 { - return float64(v.nm.DeletedSize()) / float64(v.ContentSize()) -} - -func (v *Volume) Compact() error { - glog.V(3).Infof("Compacting ...") - //no need to lock for copy on write - //v.accessLock.Lock() - //defer v.accessLock.Unlock() - //glog.V(3).Infof("Got Compaction lock...") - - filePath := v.FileName() - glog.V(3).Infof("creating copies for volume %d ...", v.Id) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") -} -func (v *Volume) commitCompact() error { - glog.V(3).Infof("Committing vacuuming...") - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - glog.V(3).Infof("Got Committing lock...") - v.nm.Close() - _ = v.dataFile.Close() - var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return e - } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return e - } - //glog.V(3).Infof("Pretending to be vacuuming...") - //time.Sleep(20 * time.Second) - glog.V(3).Infof("Loading Commit file...") - if e = v.load(true, false, v.needleMapKind); e != nil { - return e - } - return nil -} - -func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { - var ( - dst, idx *os.File - ) - if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { - return - } - defer dst.Close() - - if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { - return - } - defer idx.Close() - - nm := NewNeedleMap(idx) - new_offset := int64(SuperBlockSize) - - now := uint64(time.Now().Unix()) - - err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, - func(superBlock SuperBlock) error { - superBlock.CompactRevision++ - _, err = dst.Write(superBlock.Bytes()) - return err - }, true, func(n *Needle, offset int64) error { - if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { - return nil - } - nv, ok := v.nm.Get(n.Id) - glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { - return fmt.Errorf("cannot put needle: %s", err) - } - if _, err = n.Append(dst, v.Version()); err != nil { - return fmt.Errorf("cannot append needle: %s", err) - } - new_offset += n.DiskSize() - glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size) - } - return nil - }) - - return -} diff --git a/go/storage/volume_version.go b/go/storage/volume_version.go deleted file mode 100644 index 2e9f58aa2..000000000 --- a/go/storage/volume_version.go +++ /dev/null @@ -1,9 +0,0 @@ -package storage - -type Version uint8 - -const ( - Version1 = Version(1) - Version2 = Version(2) - CurrentVersion = Version2 -) |
