diff options
Diffstat (limited to 'weed/storage')
29 files changed, 3246 insertions, 0 deletions
diff --git a/weed/storage/compact_map.go b/weed/storage/compact_map.go new file mode 100644 index 000000000..d4438d044 --- /dev/null +++ b/weed/storage/compact_map.go @@ -0,0 +1,207 @@ +package storage + +import ( + "strconv" + "sync" +) + +type NeedleValue struct { + Key Key + Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 `comment:"Size of the data portion"` +} + +const ( + batch = 100000 +) + +type Key uint64 + +func (k Key) String() string { + return strconv.FormatUint(uint64(k), 10) +} + +type CompactSection struct { + sync.RWMutex + values []NeedleValue + overflow map[Key]NeedleValue + start Key + end Key + counter int +} + +func NewCompactSection(start Key) *CompactSection { + return &CompactSection{ + values: make([]NeedleValue, batch), + overflow: make(map[Key]NeedleValue), + start: start, + } +} + +//return old entry size +func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { + ret := uint32(0) + if key > cs.end { + cs.end = key + } + cs.Lock() + if i := cs.binarySearchValues(key); i >= 0 { + ret = cs.values[i].Size + //println("key", key, "old size", ret) + cs.values[i].Offset, cs.values[i].Size = offset, size + } else { + needOverflow := cs.counter >= batch + needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key + if needOverflow { + //println("start", cs.start, "counter", cs.counter, "key", key) + if oldValue, found := cs.overflow[key]; found { + ret = oldValue.Size + } + cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size} + } else { + p := &cs.values[cs.counter] + p.Key, p.Offset, p.Size = key, offset, size + //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) + cs.counter++ + } + } + cs.Unlock() + return ret +} + +//return old entry size +func (cs *CompactSection) Delete(key Key) uint32 { + cs.Lock() + ret := uint32(0) + if i := cs.binarySearchValues(key); i >= 0 { + if cs.values[i].Size > 0 { + ret = cs.values[i].Size + cs.values[i].Size = 0 + } + } + if v, found := cs.overflow[key]; found { + delete(cs.overflow, key) + ret = v.Size + } + cs.Unlock() + return ret +} +func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { + cs.RLock() + if v, ok := cs.overflow[key]; ok { + cs.RUnlock() + return &v, true + } + if i := cs.binarySearchValues(key); i >= 0 { + cs.RUnlock() + return &cs.values[i], true + } + cs.RUnlock() + return nil, false +} +func (cs *CompactSection) binarySearchValues(key Key) int { + l, h := 0, cs.counter-1 + if h >= 0 && cs.values[h].Key < key { + return -2 + } + //println("looking for key", key) + for l <= h { + m := (l + h) / 2 + //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size) + if cs.values[m].Key < key { + l = m + 1 + } else if key < cs.values[m].Key { + h = m - 1 + } else { + //println("found", m) + return m + } + } + return -1 +} + +//This map assumes mostly inserting increasing keys +type CompactMap struct { + list []*CompactSection +} + +func NewCompactMap() CompactMap { + return CompactMap{} +} + +func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { + x := cm.binarySearchCompactSection(key) + if x < 0 { + //println(x, "creating", len(cm.list), "section, starting", key) + cm.list = append(cm.list, NewCompactSection(key)) + x = len(cm.list) - 1 + //keep compact section sorted by start + for x > 0 { + if cm.list[x-1].start > cm.list[x].start { + cm.list[x-1], cm.list[x] = cm.list[x], cm.list[x-1] + x = x - 1 + } else { + break + } + } + } + return cm.list[x].Set(key, offset, size) +} +func (cm *CompactMap) Delete(key Key) uint32 { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return uint32(0) + } + return cm.list[x].Delete(key) +} +func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return nil, false + } + return cm.list[x].Get(key) +} +func (cm *CompactMap) binarySearchCompactSection(key Key) int { + l, h := 0, len(cm.list)-1 + if h < 0 { + return -5 + } + if cm.list[h].start <= key { + if cm.list[h].counter < batch || key <= cm.list[h].end { + return h + } + return -4 + } + for l <= h { + m := (l + h) / 2 + if key < cm.list[m].start { + h = m - 1 + } else { // cm.list[m].start <= key + if cm.list[m+1].start <= key { + l = m + 1 + } else { + return m + } + } + } + return -3 +} + +// Visit visits all entries or stop if any error when visiting +func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { + for _, cs := range cm.list { + for _, v := range cs.overflow { + if err := visit(v); err != nil { + return err + } + } + for _, v := range cs.values { + if _, found := cs.overflow[v.Key]; !found { + if err := visit(v); err != nil { + return err + } + } + } + } + return nil +} diff --git a/weed/storage/compact_map_perf_test.go b/weed/storage/compact_map_perf_test.go new file mode 100644 index 000000000..cc7669139 --- /dev/null +++ b/weed/storage/compact_map_perf_test.go @@ -0,0 +1,45 @@ +package storage + +import ( + "log" + "os" + "testing" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestMemoryUsage(t *testing.T) { + + indexFile, ie := os.OpenFile("../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + +} + +func LoadNewNeedleMap(file *os.File) CompactMap { + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + glog.V(0).Infoln("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } + + count, e = file.Read(bytes) + } + return m +} diff --git a/weed/storage/compact_map_test.go b/weed/storage/compact_map_test.go new file mode 100644 index 000000000..1ccb48edb --- /dev/null +++ b/weed/storage/compact_map_test.go @@ -0,0 +1,77 @@ +package storage + +import ( + "testing" +) + +func TestIssue52(t *testing.T) { + m := NewCompactMap() + m.Set(Key(10002), 10002, 10002) + if element, ok := m.Get(Key(10002)); ok { + println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) + } + m.Set(Key(10001), 10001, 10001) + if element, ok := m.Get(Key(10002)); ok { + println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) + } else { + t.Fatal("key 10002 missing after setting 10001") + } +} + +func TestXYZ(t *testing.T) { + m := NewCompactMap() + for i := uint32(0); i < 100*batch; i += 2 { + m.Set(Key(i), i, i) + } + + for i := uint32(0); i < 100*batch; i += 37 { + m.Delete(Key(i)) + } + + for i := uint32(0); i < 10*batch; i += 3 { + m.Set(Key(i), i+11, i+5) + } + + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // glog.V(4).Infoln(i, "=", v.Key, v.Offset, v.Size) + // } + // } + + for i := uint32(0); i < 10*batch; i++ { + v, ok := m.Get(Key(i)) + if i%3 == 0 { + if !ok { + t.Fatal("key", i, "missing!") + } + if v.Size != i+5 { + t.Fatal("key", i, "size", v.Size) + } + } else if i%37 == 0 { + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v.Size != i { + t.Fatal("key", i, "size", v.Size) + } + } + } + + for i := uint32(10 * batch); i < 100*batch; i++ { + v, ok := m.Get(Key(i)) + if i%37 == 0 { + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v == nil { + t.Fatal("key", i, "missing") + } + if v.Size != i { + t.Fatal("key", i, "size", v.Size) + } + } + } + +} diff --git a/weed/storage/crc.go b/weed/storage/crc.go new file mode 100644 index 000000000..494937784 --- /dev/null +++ b/weed/storage/crc.go @@ -0,0 +1,30 @@ +package storage + +import ( + "fmt" + "github.com/klauspost/crc32" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +var table = crc32.MakeTable(crc32.Castagnoli) + +type CRC uint32 + +func NewCRC(b []byte) CRC { + return CRC(0).Update(b) +} + +func (c CRC) Update(b []byte) CRC { + return CRC(crc32.Update(uint32(c), table, b)) +} + +func (c CRC) Value() uint32 { + return uint32(c>>15|c<<17) + 0xa282ead8 +} + +func (n *Needle) Etag() string { + bits := make([]byte, 4) + util.Uint32toBytes(bits, uint32(n.Checksum)) + return fmt.Sprintf("\"%x\"", bits) +} diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go new file mode 100644 index 000000000..cc3c83b63 --- /dev/null +++ b/weed/storage/disk_location.go @@ -0,0 +1,73 @@ +package storage + +import ( + "io/ioutil" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type DiskLocation struct { + Directory string + MaxVolumeCount int + volumes map[VolumeId]*Volume +} + +func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { + location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} + location.volumes = make(map[VolumeId]*Volume) + return location +} + +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { + + if dirs, err := ioutil.ReadDir(l.Directory); err == nil { + for _, dir := range dirs { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + collection := "" + base := name[:len(name)-len(".dat")] + i := strings.LastIndex(base, "_") + if i > 0 { + collection, base = base[0:i], base[i+1:] + } + if vid, err := NewVolumeId(base); err == nil { + if l.volumes[vid] == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { + l.volumes[vid] = v + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) + } else { + glog.V(0).Infof("new volume %s error %s", name, e) + } + } + } + } + } + } + glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) +} + +func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { + for k, v := range l.volumes { + if v.Collection == collection { + e = l.deleteVolumeById(k) + if e != nil { + return + } + } + } + return +} + +func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { + v, ok := l.volumes[vid] + if !ok { + return + } + e = v.Destroy() + if e != nil { + return + } + delete(l.volumes, vid) + return +} diff --git a/weed/storage/file_id.go b/weed/storage/file_id.go new file mode 100644 index 000000000..4cfdb16fa --- /dev/null +++ b/weed/storage/file_id.go @@ -0,0 +1,43 @@ +package storage + +import ( + "encoding/hex" + "errors" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type FileId struct { + VolumeId VolumeId + Key uint64 + Hashcode uint32 +} + +func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId { + return &FileId{VolumeId: VolumeId, Key: n.Id, Hashcode: n.Cookie} +} +func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId { + return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} +} +func ParseFileId(fid string) (*FileId, error) { + a := strings.Split(fid, ",") + if len(a) != 2 { + glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a)) + return nil, errors.New("Invalid fid " + fid) + } + vid_string, key_hash_string := a[0], a[1] + volumeId, _ := NewVolumeId(vid_string) + key, hash, e := ParseKeyHash(key_hash_string) + return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e +} +func (n *FileId) String() string { + bytes := make([]byte, 12) + util.Uint64toBytes(bytes[0:8], n.Key) + util.Uint32toBytes(bytes[8:12], n.Hashcode) + nonzero_index := 0 + for ; bytes[nonzero_index] == 0; nonzero_index++ { + } + return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:]) +} diff --git a/weed/storage/needle.go b/weed/storage/needle.go new file mode 100644 index 000000000..29549b323 --- /dev/null +++ b/weed/storage/needle.go @@ -0,0 +1,231 @@ +package storage + +import ( + "fmt" + "io/ioutil" + "mime" + "net/http" + "path" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/images" + "github.com/chrislusf/seaweedfs/weed/operation" +) + +const ( + NeedleHeaderSize = 16 //should never change this + NeedlePaddingSize = 8 + NeedleChecksumSize = 4 + MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 +) + +/* +* A Needle means a uploaded and stored file. +* Needle file size is limited to 4GB for now. + */ +type Needle struct { + Cookie uint32 `comment:"random number to mitigate brute force lookups"` + Id uint64 `comment:"needle id"` + Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` + + DataSize uint32 `comment:"Data size"` //version2 + Data []byte `comment:"The actual file data"` + Flags byte `comment:"boolean flags"` //version2 + NameSize uint8 //version2 + Name []byte `comment:"maximum 256 characters"` //version2 + MimeSize uint8 //version2 + Mime []byte `comment:"maximum 256 characters"` //version2 + LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk + Ttl *TTL + + Checksum CRC `comment:"CRC32 to check integrity"` + Padding []byte `comment:"Aligned to 8 bytes"` + + rawBlock *Block // underlying supporing []byte, fetched and released into a pool +} + +func (n *Needle) String() (str string) { + str = fmt.Sprintf("Cookie:%d, Id:%d, Size:%d, DataSize:%d, Name: %s, Mime: %s", n.Cookie, n.Id, n.Size, n.DataSize, n.Name, n.Mime) + return +} + +func ParseUpload(r *http.Request) ( + fileName string, data []byte, mimeType string, isGzipped bool, + modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { + form, fe := r.MultipartReader() + if fe != nil { + glog.V(0).Infoln("MultipartReader [ERROR]", fe) + e = fe + return + } + + //first multi-part item + part, fe := form.NextPart() + if fe != nil { + glog.V(0).Infoln("Reading Multi part [ERROR]", fe) + e = fe + return + } + + fileName = part.FileName() + if fileName != "" { + fileName = path.Base(fileName) + } + + data, e = ioutil.ReadAll(part) + if e != nil { + glog.V(0).Infoln("Reading Content [ERROR]", e) + return + } + + //if the filename is empty string, do a search on the other multi-part items + for fileName == "" { + part2, fe := form.NextPart() + if fe != nil { + break // no more or on error, just safely break + } + + fName := part2.FileName() + + //found the first <file type> multi-part has filename + if fName != "" { + data2, fe2 := ioutil.ReadAll(part2) + if fe2 != nil { + glog.V(0).Infoln("Reading Content [ERROR]", fe2) + e = fe2 + return + } + + //update + data = data2 + fileName = path.Base(fName) + break + } + } + + dotIndex := strings.LastIndex(fileName, ".") + ext, mtype := "", "" + if dotIndex > 0 { + ext = strings.ToLower(fileName[dotIndex:]) + mtype = mime.TypeByExtension(ext) + } + contentType := part.Header.Get("Content-Type") + if contentType != "" && mtype != contentType { + mimeType = contentType //only return mime type if not deductable + mtype = contentType + } + if part.Header.Get("Content-Encoding") == "gzip" { + isGzipped = true + } else if operation.IsGzippable(ext, mtype) { + if data, e = operation.GzipData(data); e != nil { + return + } + isGzipped = true + } + if ext == ".gz" { + isGzipped = true + } + if strings.HasSuffix(fileName, ".gz") && + !strings.HasSuffix(fileName, ".tar.gz") { + fileName = fileName[:len(fileName)-3] + } + modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) + ttl, _ = ReadTTL(r.FormValue("ttl")) + isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) + return +} +func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { + fname, mimeType, isGzipped, isChunkedFile := "", "", false, false + n = new(Needle) + fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) + if e != nil { + return + } + if len(fname) < 256 { + n.Name = []byte(fname) + n.SetHasName() + } + if len(mimeType) < 256 { + n.Mime = []byte(mimeType) + n.SetHasMime() + } + if isGzipped { + n.SetGzipped() + } + if n.LastModified == 0 { + n.LastModified = uint64(time.Now().Unix()) + } + n.SetHasLastModifiedDate() + if n.Ttl != EMPTY_TTL { + n.SetHasTtl() + } + + if isChunkedFile { + n.SetIsChunkManifest() + } + + if fixJpgOrientation { + loweredName := strings.ToLower(fname) + if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") { + n.Data = images.FixJpgOrientation(n.Data) + } + } + + n.Checksum = NewCRC(n.Data) + + commaSep := strings.LastIndex(r.URL.Path, ",") + dotSep := strings.LastIndex(r.URL.Path, ".") + fid := r.URL.Path[commaSep+1:] + if dotSep > 0 { + fid = r.URL.Path[commaSep+1 : dotSep] + } + + e = n.ParsePath(fid) + + return +} +func (n *Needle) ParsePath(fid string) (err error) { + length := len(fid) + if length <= 8 { + return fmt.Errorf("Invalid fid: %s", fid) + } + delta := "" + deltaIndex := strings.LastIndex(fid, "_") + if deltaIndex > 0 { + fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] + } + n.Id, n.Cookie, err = ParseKeyHash(fid) + if err != nil { + return err + } + if delta != "" { + if d, e := strconv.ParseUint(delta, 10, 64); e == nil { + n.Id += d + } else { + return e + } + } + return err +} + +func ParseKeyHash(key_hash_string string) (uint64, uint32, error) { + if len(key_hash_string) <= 8 { + return 0, 0, fmt.Errorf("KeyHash is too short.") + } + if len(key_hash_string) > 24 { + return 0, 0, fmt.Errorf("KeyHash is too long.") + } + split := len(key_hash_string) - 8 + key, err := strconv.ParseUint(key_hash_string[:split], 16, 64) + if err != nil { + return 0, 0, fmt.Errorf("Parse key error: %v", err) + } + hash, err := strconv.ParseUint(key_hash_string[split:], 16, 32) + if err != nil { + return 0, 0, fmt.Errorf("Parse hash error: %v", err) + } + return key, uint32(hash), nil +} diff --git a/weed/storage/needle_byte_cache.go b/weed/storage/needle_byte_cache.go new file mode 100644 index 000000000..ae35a48ba --- /dev/null +++ b/weed/storage/needle_byte_cache.go @@ -0,0 +1,75 @@ +package storage + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/hashicorp/golang-lru" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + bytesCache *lru.Cache + bytesPool *util.BytesPool +) + +/* +There are one level of caching, and one level of pooling. + +In pooling, all []byte are fetched and returned to the pool bytesPool. + +In caching, the string~[]byte mapping is cached +*/ +func init() { + bytesPool = util.NewBytesPool() + bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) { + value.(*Block).decreaseReference() + }) +} + +type Block struct { + Bytes []byte + refCount int32 +} + +func (block *Block) decreaseReference() { + if atomic.AddInt32(&block.refCount, -1) == 0 { + bytesPool.Put(block.Bytes) + } +} +func (block *Block) increaseReference() { + atomic.AddInt32(&block.refCount, 1) +} + +// get bytes from the LRU cache of []byte first, then from the bytes pool +// when []byte in LRU cache is evicted, it will be put back to the bytes pool +func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) { + // check cache, return if found + cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) + if obj, found := bytesCache.Get(cacheKey); found { + block = obj.(*Block) + block.increaseReference() + dataSlice = block.Bytes[0:readSize] + return dataSlice, block, nil + } + + // get the []byte from pool + b := bytesPool.Get(readSize) + // refCount = 2, one by the bytesCache, one by the actual needle object + block = &Block{Bytes: b, refCount: 2} + dataSlice = block.Bytes[0:readSize] + _, err = r.ReadAt(dataSlice, offset) + bytesCache.Add(cacheKey, block) + return dataSlice, block, err +} + +func (n *Needle) ReleaseMemory() { + if n.rawBlock != nil { + n.rawBlock.decreaseReference() + } +} +func ReleaseBytes(b []byte) { + bytesPool.Put(b) +} diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go new file mode 100644 index 000000000..05bc6e86c --- /dev/null +++ b/weed/storage/needle_map.go @@ -0,0 +1,123 @@ +package storage + +import ( + "fmt" + "io/ioutil" + "os" + "sync" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +type NeedleMapType int + +const ( + NeedleMapInMemory NeedleMapType = iota + NeedleMapLevelDb + NeedleMapBoltDb +) + +type NeedleMapper interface { + Put(key uint64, offset uint32, size uint32) error + Get(key uint64) (element *NeedleValue, ok bool) + Delete(key uint64) error + Close() + Destroy() error + ContentSize() uint64 + DeletedSize() uint64 + FileCount() int + DeletedCount() int + MaxFileKey() uint64 + IndexFileSize() uint64 + IndexFileContent() ([]byte, error) + IndexFileName() string +} + +type baseNeedleMapper struct { + indexFile *os.File + indexFileAccessLock sync.Mutex + + mapMetric +} + +func (nm *baseNeedleMapper) IndexFileSize() uint64 { + stat, err := nm.indexFile.Stat() + if err == nil { + return uint64(stat.Size()) + } + return 0 +} + +func (nm *baseNeedleMapper) IndexFileName() string { + return nm.indexFile.Name() +} + +func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { + key = util.BytesToUint64(bytes[:8]) + offset = util.BytesToUint32(bytes[8:12]) + size = util.BytesToUint32(bytes[12:16]) + return +} +func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + + nm.indexFileAccessLock.Lock() + defer nm.indexFileAccessLock.Unlock() + if _, err := nm.indexFile.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + nm.indexFile.Name(), err) + } + _, err := nm.indexFile.Write(bytes) + return err +} +func (nm *baseNeedleMapper) IndexFileContent() ([]byte, error) { + nm.indexFileAccessLock.Lock() + defer nm.indexFileAccessLock.Unlock() + return ioutil.ReadFile(nm.indexFile.Name()) +} + +type mapMetric struct { + indexFile *os.File + + DeletionCounter int `json:"DeletionCounter"` + FileCounter int `json:"FileCounter"` + DeletionByteCounter uint64 `json:"DeletionByteCounter"` + FileByteCounter uint64 `json:"FileByteCounter"` + MaximumFileKey uint64 `json:"MaxFileKey"` +} + +func (mm *mapMetric) logDelete(deletedByteCount uint32) { + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) + mm.DeletionCounter++ +} + +func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { + if key > mm.MaximumFileKey { + mm.MaximumFileKey = key + } + mm.FileCounter++ + mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) + if oldSize > 0 { + mm.DeletionCounter++ + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) + } +} + +func (mm mapMetric) ContentSize() uint64 { + return mm.FileByteCounter +} +func (mm mapMetric) DeletedSize() uint64 { + return mm.DeletionByteCounter +} +func (mm mapMetric) FileCount() int { + return mm.FileCounter +} +func (mm mapMetric) DeletedCount() int { + return mm.DeletionCounter +} +func (mm mapMetric) MaxFileKey() uint64 { + return mm.MaximumFileKey +} diff --git a/weed/storage/needle_map_boltdb.go b/weed/storage/needle_map_boltdb.go new file mode 100644 index 000000000..bd3edf28d --- /dev/null +++ b/weed/storage/needle_map_boltdb.go @@ -0,0 +1,165 @@ +package storage + +import ( + "fmt" + "os" + + "github.com/boltdb/bolt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type BoltDbNeedleMap struct { + dbFileName string + db *bolt.DB + baseNeedleMapper +} + +var boltdbBucket = []byte("weed") + +func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) { + m = &BoltDbNeedleMap{dbFileName: dbFileName} + m.indexFile = indexFile + if !isBoltDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateBoltDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isBoltDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(dbFileName) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateBoltDbFile(dbFileName string, indexFile *os.File) error { + db, err := bolt.Open(dbFileName, 0644, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + boltDbWrite(db, key, offset, size) + } else { + boltDbDelete(db, key) + } + return nil + }) +} + +func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + var data []byte + util.Uint64toBytes(bytes, key) + err := m.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(boltdbBucket) + if bucket == nil { + return fmt.Errorf("Bucket %q not found!", boltdbBucket) + } + + data = bucket.Get(bytes) + return nil + }) + + if err != nil || len(data) != 8 { + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := m.appendToIndexFile(key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return boltDbWrite(m.db, key, offset, size) +} + +func boltDbWrite(db *bolt.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + return db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) + if err != nil { + return err + } + + err = bucket.Put(bytes[0:8], bytes[8:16]) + if err != nil { + return err + } + return nil + }) +} +func boltDbDelete(db *bolt.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) + if err != nil { + return err + } + + err = bucket.Delete(bytes) + if err != nil { + return err + } + return nil + }) +} + +func (m *BoltDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := m.appendToIndexFile(key, 0, 0); err != nil { + return err + } + return boltDbDelete(m.db, key) +} + +func (m *BoltDbNeedleMap) Close() { + m.db.Close() +} + +func (m *BoltDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go new file mode 100644 index 000000000..1789dbb12 --- /dev/null +++ b/weed/storage/needle_map_leveldb.go @@ -0,0 +1,134 @@ +package storage + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/syndtr/goleveldb/leveldb" +) + +type LevelDbNeedleMap struct { + dbFileName string + db *leveldb.DB + baseNeedleMapper +} + +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { + m = &LevelDbNeedleMap{dbFileName: dbFileName} + m.indexFile = indexFile + if !isLevelDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateLevelDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG")) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateLevelDbFile(dbFileName string, indexFile *os.File) error { + db, err := leveldb.OpenFile(dbFileName, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + levelDbWrite(db, key, offset, size) + } else { + levelDbDelete(db, key) + } + return nil + }) +} + +func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + data, err := m.db.Get(bytes, nil) + if err != nil || len(data) != 8 { + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := m.appendToIndexFile(key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return levelDbWrite(m.db, key, offset, size) +} + +func levelDbWrite(db *leveldb.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil { + return fmt.Errorf("failed to write leveldb: %v", err) + } + return nil +} +func levelDbDelete(db *leveldb.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Delete(bytes, nil) +} + +func (m *LevelDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := m.appendToIndexFile(key, 0, 0); err != nil { + return err + } + return levelDbDelete(m.db, key) +} + +func (m *LevelDbNeedleMap) Close() { + m.db.Close() +} + +func (m *LevelDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go new file mode 100644 index 000000000..f2f4835df --- /dev/null +++ b/weed/storage/needle_map_memory.go @@ -0,0 +1,106 @@ +package storage + +import ( + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type NeedleMap struct { + m CompactMap + + baseNeedleMapper +} + +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: NewCompactMap(), + } + nm.indexFile = file + return nm +} + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { + nm := NewNeedleMap(file) + e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { + if key > nm.MaximumFileKey { + nm.MaximumFileKey = key + } + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(Key(key)) + glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + return nil + }) + glog.V(1).Infoln("max file key:", nm.MaximumFileKey) + return nm, e +} + +// walks through the index file, calls fn function with each key, offset, size +// stops with the error returned by the fn function +func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { + var readerOffset int64 + bytes := make([]byte, 16*RowsToRead) + count, e := r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + var ( + key uint64 + offset, size uint32 + i int + ) + + for count > 0 && e == nil || e == io.EOF { + for i = 0; i+16 <= count; i += 16 { + key, offset, size = idxFileEntry(bytes[i : i+16]) + if e = fn(key, offset, size); e != nil { + return e + } + } + if e == io.EOF { + return nil + } + count, e = r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + } + return e +} + +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { + oldSize := nm.m.Set(Key(key), offset, size) + nm.logPut(key, oldSize, size) + return nm.appendToIndexFile(key, offset, size) +} +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + element, ok = nm.m.Get(Key(key)) + return +} +func (nm *NeedleMap) Delete(key uint64) error { + deletedBytes := nm.m.Delete(Key(key)) + nm.logDelete(deletedBytes) + return nm.appendToIndexFile(key, 0, 0) +} +func (nm *NeedleMap) Close() { + _ = nm.indexFile.Close() +} +func (nm *NeedleMap) Destroy() error { + nm.Close() + return os.Remove(nm.indexFile.Name()) +} diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go new file mode 100644 index 000000000..2f26147d6 --- /dev/null +++ b/weed/storage/needle_read_write.go @@ -0,0 +1,291 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + FlagGzip = 0x01 + FlagHasName = 0x02 + FlagHasMime = 0x04 + FlagHasLastModifiedDate = 0x08 + FlagHasTtl = 0x10 + FlagIsChunkManifest = 0x80 + LastModifiedBytesLength = 5 + TtlBytesLength = 2 +) + +func (n *Needle) DiskSize() int64 { + padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize) + return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize +} +func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { + if s, ok := w.(io.Seeker); ok { + if end, e := s.Seek(0, 1); e == nil { + defer func(s io.Seeker, off int64) { + if err != nil { + if _, e = s.Seek(off, 0); e != nil { + glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e) + } + } + }(s, end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return + } + } + switch version { + case Version1: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.Size = uint32(len(n.Data)) + size = n.Size + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return + case Version2: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) + if n.DataSize > 0 { + n.Size = 4 + n.DataSize + 1 + if n.HasName() { + n.Size = n.Size + 1 + uint32(n.NameSize) + } + if n.HasMime() { + n.Size = n.Size + 1 + uint32(n.MimeSize) + } + if n.HasLastModifiedDate() { + n.Size = n.Size + LastModifiedBytesLength + } + if n.HasTtl() { + n.Size = n.Size + TtlBytesLength + } + } else { + n.Size = 0 + } + size = n.DataSize + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if n.DataSize > 0 { + util.Uint32toBytes(header[0:4], n.DataSize) + if _, err = w.Write(header[0:4]); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + util.Uint8toBytes(header[0:1], n.Flags) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if n.HasName() { + util.Uint8toBytes(header[0:1], n.NameSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Name); err != nil { + return + } + } + if n.HasMime() { + util.Uint8toBytes(header[0:1], n.MimeSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Mime); err != nil { + return + } + } + if n.HasLastModifiedDate() { + util.Uint64toBytes(header[0:8], n.LastModified) + if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil { + return + } + } + if n.HasTtl() && n.Ttl != nil { + n.Ttl.ToBytes(header[0:TtlBytesLength]) + if _, err = w.Write(header[0:TtlBytesLength]); err != nil { + return + } + } + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return n.DataSize, err + } + return 0, fmt.Errorf("Unsupported Version! (%d)", version) +} + +func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { + padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) + readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding + return getBytesForFileBlock(r, offset, int(readSize)) +} + +func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { + bytes, block, err := ReadNeedleBlob(r, offset, size) + if err != nil { + return err + } + n.rawBlock = block + n.ParseNeedleHeader(bytes) + if n.Size != size { + return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) + } + switch version { + case Version1: + n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] + case Version2: + n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) + } + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + newChecksum := NewCRC(n.Data) + if checksum != newChecksum.Value() { + return errors.New("CRC error! Data On Disk Corrupted") + } + n.Checksum = newChecksum + return nil +} +func (n *Needle) ParseNeedleHeader(bytes []byte) { + n.Cookie = util.BytesToUint32(bytes[0:4]) + n.Id = util.BytesToUint64(bytes[4:12]) + n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) +} +func (n *Needle) readNeedleDataVersion2(bytes []byte) { + index, lenBytes := 0, len(bytes) + if index < lenBytes { + n.DataSize = util.BytesToUint32(bytes[index : index+4]) + index = index + 4 + if int(n.DataSize)+index > lenBytes { + // this if clause is due to bug #87 and #93, fixed in v0.69 + // remove this clause later + return + } + n.Data = bytes[index : index+int(n.DataSize)] + index = index + int(n.DataSize) + n.Flags = bytes[index] + index = index + 1 + } + if index < lenBytes && n.HasName() { + n.NameSize = uint8(bytes[index]) + index = index + 1 + n.Name = bytes[index : index+int(n.NameSize)] + index = index + int(n.NameSize) + } + if index < lenBytes && n.HasMime() { + n.MimeSize = uint8(bytes[index]) + index = index + 1 + n.Mime = bytes[index : index+int(n.MimeSize)] + index = index + int(n.MimeSize) + } + if index < lenBytes && n.HasLastModifiedDate() { + n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) + index = index + LastModifiedBytesLength + } + if index < lenBytes && n.HasTtl() { + n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) + index = index + TtlBytesLength + } +} + +func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { + n = new(Needle) + if version == Version1 || version == Version2 { + bytes := make([]byte, NeedleHeaderSize) + var count int + count, err = r.ReadAt(bytes, offset) + if count <= 0 || err != nil { + return nil, 0, err + } + n.ParseNeedleHeader(bytes) + padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) + bodyLength = n.Size + NeedleChecksumSize + padding + } + return +} + +//n should be a needle already read the header +//the input stream will read until next file entry +func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) { + if bodyLength <= 0 { + return nil + } + switch version { + case Version1: + bytes := make([]byte, bodyLength) + if _, err = r.ReadAt(bytes, offset); err != nil { + return + } + n.Data = bytes[:n.Size] + n.Checksum = NewCRC(n.Data) + case Version2: + bytes := make([]byte, bodyLength) + if _, err = r.ReadAt(bytes, offset); err != nil { + return + } + n.readNeedleDataVersion2(bytes[0:n.Size]) + n.Checksum = NewCRC(n.Data) + default: + err = fmt.Errorf("Unsupported Version! (%d)", version) + } + return +} + +func (n *Needle) IsGzipped() bool { + return n.Flags&FlagGzip > 0 +} +func (n *Needle) SetGzipped() { + n.Flags = n.Flags | FlagGzip +} +func (n *Needle) HasName() bool { + return n.Flags&FlagHasName > 0 +} +func (n *Needle) SetHasName() { + n.Flags = n.Flags | FlagHasName +} +func (n *Needle) HasMime() bool { + return n.Flags&FlagHasMime > 0 +} +func (n *Needle) SetHasMime() { + n.Flags = n.Flags | FlagHasMime +} +func (n *Needle) HasLastModifiedDate() bool { + return n.Flags&FlagHasLastModifiedDate > 0 +} +func (n *Needle) SetHasLastModifiedDate() { + n.Flags = n.Flags | FlagHasLastModifiedDate +} +func (n *Needle) HasTtl() bool { + return n.Flags&FlagHasTtl > 0 +} +func (n *Needle) SetHasTtl() { + n.Flags = n.Flags | FlagHasTtl +} + +func (n *Needle) IsChunkedManifest() bool { + return n.Flags&FlagIsChunkManifest > 0 +} + +func (n *Needle) SetIsChunkManifest() { + n.Flags = n.Flags | FlagIsChunkManifest +} diff --git a/weed/storage/needle_test.go b/weed/storage/needle_test.go new file mode 100644 index 000000000..c05afda2f --- /dev/null +++ b/weed/storage/needle_test.go @@ -0,0 +1,45 @@ +package storage + +import "testing" + +func TestParseKeyHash(t *testing.T) { + testcases := []struct { + KeyHash string + ID uint64 + Cookie uint32 + Err bool + }{ + // normal + {"4ed4c8116e41", 0x4ed4, 0xc8116e41, false}, + // cookie with leading zeros + {"4ed401116e41", 0x4ed4, 0x01116e41, false}, + // odd length + {"ed400116e41", 0xed4, 0x00116e41, false}, + // uint + {"fed4c8114ed4c811f0116e41", 0xfed4c8114ed4c811, 0xf0116e41, false}, + // err: too short + {"4ed4c811", 0, 0, true}, + // err: too long + {"4ed4c8114ed4c8114ed4c8111", 0, 0, true}, + // err: invalid character + {"helloworld", 0, 0, true}, + } + + for _, tc := range testcases { + if id, cookie, err := ParseKeyHash(tc.KeyHash); err != nil && !tc.Err { + t.Fatalf("Parse %s error: %v", tc.KeyHash, err) + } else if err == nil && tc.Err { + t.Fatalf("Parse %s expected error got nil", tc.KeyHash) + } else if id != tc.ID || cookie != tc.Cookie { + t.Fatalf("Parse %s wrong result. Expected: (%d, %d) got: (%d, %d)", tc.KeyHash, tc.ID, tc.Cookie, id, cookie) + } + } +} + +func BenchmarkParseKeyHash(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + ParseKeyHash("4ed44ed44ed44ed4c8116e41") + } +} diff --git a/weed/storage/replica_placement.go b/weed/storage/replica_placement.go new file mode 100644 index 000000000..c1aca52eb --- /dev/null +++ b/weed/storage/replica_placement.go @@ -0,0 +1,53 @@ +package storage + +import ( + "errors" + "fmt" +) + +type ReplicaPlacement struct { + SameRackCount int + DiffRackCount int + DiffDataCenterCount int +} + +func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { + rp := &ReplicaPlacement{} + for i, c := range t { + count := int(c - '0') + if 0 <= count && count <= 2 { + switch i { + case 0: + rp.DiffDataCenterCount = count + case 1: + rp.DiffRackCount = count + case 2: + rp.SameRackCount = count + } + } else { + return rp, errors.New("Unknown Replication Type:" + t) + } + } + return rp, nil +} + +func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { + return NewReplicaPlacementFromString(fmt.Sprintf("%03d", b)) +} + +func (rp *ReplicaPlacement) Byte() byte { + ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount + return byte(ret) +} + +func (rp *ReplicaPlacement) String() string { + b := make([]byte, 3) + b[0] = byte(rp.DiffDataCenterCount + '0') + b[1] = byte(rp.DiffRackCount + '0') + b[2] = byte(rp.SameRackCount + '0') + return string(b) +} + +func (rp *ReplicaPlacement) GetCopyCount() int { + return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 +} diff --git a/weed/storage/replica_placement_test.go b/weed/storage/replica_placement_test.go new file mode 100644 index 000000000..9c2161e94 --- /dev/null +++ b/weed/storage/replica_placement_test.go @@ -0,0 +1,14 @@ +package storage + +import ( + "testing" +) + +func TestReplicaPlacemnetSerialDeserial(t *testing.T) { + rp, _ := NewReplicaPlacementFromString("001") + new_rp, _ := NewReplicaPlacementFromByte(rp.Byte()) + if rp.String() != new_rp.String() { + println("expected:", rp.String(), "actual:", new_rp.String()) + t.Fail() + } +} diff --git a/weed/storage/store.go b/weed/storage/store.go new file mode 100644 index 000000000..d44d6a863 --- /dev/null +++ b/weed/storage/store.go @@ -0,0 +1,340 @@ +package storage + +import ( + "encoding/json" + "errors" + "fmt" + "math/rand" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" +) + +const ( + MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes +) + +type MasterNodes struct { + nodes []string + lastNode int +} + +func (mn *MasterNodes) String() string { + return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode) +} + +func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { + mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} + return +} +func (mn *MasterNodes) reset() { + glog.V(4).Infof("Resetting master nodes: %v", mn) + if len(mn.nodes) > 1 && mn.lastNode >= 0 { + glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) + mn.lastNode = -mn.lastNode - 1 + } +} +func (mn *MasterNodes) findMaster() (string, error) { + if len(mn.nodes) == 0 { + return "", errors.New("No master node found!") + } + if mn.lastNode < 0 { + for _, m := range mn.nodes { + glog.V(4).Infof("Listing masters on %s", m) + if masters, e := operation.ListMasters(m); e == nil { + if len(masters) == 0 { + continue + } + mn.nodes = append(masters, m) + mn.lastNode = rand.Intn(len(mn.nodes)) + glog.V(2).Infof("current master nodes is %v", mn) + break + } else { + glog.V(4).Infof("Failed listing masters on %s: %v", m, e) + } + } + } + if mn.lastNode < 0 { + return "", errors.New("No master node available!") + } + return mn.nodes[mn.lastNode], nil +} + +/* + * A VolumeServer contains one Store + */ +type Store struct { + Ip string + Port int + PublicUrl string + Locations []*DiskLocation + dataCenter string //optional informaton, overwriting master setting if exists + rack string //optional information, overwriting master setting if exists + connected bool + volumeSizeLimit uint64 //read from the master + masterNodes *MasterNodes +} + +func (s *Store) String() (str string) { + str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes) + return +} + +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { + s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} + s.Locations = make([]*DiskLocation, 0) + for i := 0; i < len(dirnames); i++ { + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) + location.loadExistingVolumes(needleMapKind) + s.Locations = append(s.Locations, location) + } + return +} +func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { + rt, e := NewReplicaPlacementFromString(replicaPlacement) + if e != nil { + return e + } + ttl, e := ReadTTL(ttlString) + if e != nil { + return e + } + for _, range_string := range strings.Split(volumeListString, ",") { + if strings.Index(range_string, "-") < 0 { + id_string := range_string + id, err := NewVolumeId(id_string) + if err != nil { + return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) + } + e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) + } else { + pair := strings.Split(range_string, "-") + start, start_err := strconv.ParseUint(pair[0], 10, 64) + if start_err != nil { + return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0]) + } + end, end_err := strconv.ParseUint(pair[1], 10, 64) + if end_err != nil { + return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) + } + for id := start; id <= end; id++ { + if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { + e = err + } + } + } + } + return e +} +func (s *Store) DeleteCollection(collection string) (e error) { + for _, location := range s.Locations { + e = location.DeleteCollectionFromDiskLocation(collection) + if e != nil { + return + } + } + return +} + +func (s *Store) findVolume(vid VolumeId) *Volume { + for _, location := range s.Locations { + if v, found := location.volumes[vid]; found { + return v + } + } + return nil +} +func (s *Store) findFreeLocation() (ret *DiskLocation) { + max := 0 + for _, location := range s.Locations { + currentFreeCount := location.MaxVolumeCount - len(location.volumes) + if currentFreeCount > max { + max = currentFreeCount + ret = location + } + } + return ret +} +func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { + if s.findVolume(vid) != nil { + return fmt.Errorf("Volume Id %d already exists!", vid) + } + if location := s.findFreeLocation(); location != nil { + glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", + location.Directory, vid, collection, replicaPlacement, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { + location.volumes[vid] = volume + return nil + } else { + return err + } + } + return fmt.Errorf("No more free space left") +} + +func (s *Store) Status() []*VolumeInfo { + var stats []*VolumeInfo + for _, location := range s.Locations { + for k, v := range location.volumes { + s := &VolumeInfo{ + Id: VolumeId(k), + Size: v.ContentSize(), + Collection: v.Collection, + ReplicaPlacement: v.ReplicaPlacement, + Version: v.Version(), + FileCount: v.nm.FileCount(), + DeleteCount: v.nm.DeletedCount(), + DeletedByteCount: v.nm.DeletedSize(), + ReadOnly: v.readOnly, + Ttl: v.Ttl} + stats = append(stats, s) + } + } + sortVolumeInfos(stats) + return stats +} + +func (s *Store) SetDataCenter(dataCenter string) { + s.dataCenter = dataCenter +} +func (s *Store) SetRack(rack string) { + s.rack = rack +} + +func (s *Store) SetBootstrapMaster(bootstrapMaster string) { + s.masterNodes = NewMasterNodes(bootstrapMaster) +} +func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { + masterNode, e = s.masterNodes.findMaster() + if e != nil { + return + } + var volumeMessages []*operation.VolumeInformationMessage + maxVolumeCount := 0 + var maxFileKey uint64 + for _, location := range s.Locations { + maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + for k, v := range location.volumes { + if maxFileKey < v.nm.MaxFileKey() { + maxFileKey = v.nm.MaxFileKey() + } + if !v.expired(s.volumeSizeLimit) { + volumeMessage := &operation.VolumeInformationMessage{ + Id: proto.Uint32(uint32(k)), + Size: proto.Uint64(uint64(v.Size())), + Collection: proto.String(v.Collection), + FileCount: proto.Uint64(uint64(v.nm.FileCount())), + DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), + DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), + ReadOnly: proto.Bool(v.readOnly), + ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), + Version: proto.Uint32(uint32(v.Version())), + Ttl: proto.Uint32(v.Ttl.ToUint32()), + } + volumeMessages = append(volumeMessages, volumeMessage) + } else { + if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + location.deleteVolumeById(v.Id) + glog.V(0).Infoln("volume", v.Id, "is deleted.") + } else { + glog.V(0).Infoln("volume", v.Id, "is expired.") + } + } + } + } + + joinMessage := &operation.JoinMessage{ + IsInit: proto.Bool(!s.connected), + Ip: proto.String(s.Ip), + Port: proto.Uint32(uint32(s.Port)), + PublicUrl: proto.String(s.PublicUrl), + MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)), + MaxFileKey: proto.Uint64(maxFileKey), + DataCenter: proto.String(s.dataCenter), + Rack: proto.String(s.rack), + Volumes: volumeMessages, + } + + data, err := proto.Marshal(joinMessage) + if err != nil { + return "", "", err + } + + joinUrl := "http://" + masterNode + "/dir/join" + glog.V(4).Infof("Connecting to %s ...", joinUrl) + + jsonBlob, err := util.PostBytes(joinUrl, data) + if err != nil { + s.masterNodes.reset() + return "", "", err + } + var ret operation.JoinResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) + s.masterNodes.reset() + return masterNode, "", err + } + if ret.Error != "" { + s.masterNodes.reset() + return masterNode, "", errors.New(ret.Error) + } + s.volumeSizeLimit = ret.VolumeSizeLimit + secretKey = security.Secret(ret.SecretKey) + s.connected = true + return +} +func (s *Store) Close() { + for _, location := range s.Locations { + for _, v := range location.volumes { + v.Close() + } + } +} +func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { + if v := s.findVolume(i); v != nil { + if v.readOnly { + err = fmt.Errorf("Volume %d is read only", i) + return + } + if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { + size, err = v.write(n) + } else { + err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize()) + } + if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { + glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) + if _, _, e := s.SendHeartbeatToMaster(); e != nil { + glog.V(0).Infoln("error when reporting size:", e) + } + } + return + } + glog.V(0).Infoln("volume", i, "not found!") + err = fmt.Errorf("Volume %d not found!", i) + return +} +func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { + if v := s.findVolume(i); v != nil && !v.readOnly { + return v.delete(n) + } + return 0, nil +} +func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { + if v := s.findVolume(i); v != nil { + return v.readNeedle(n) + } + return 0, fmt.Errorf("Volume %v not found!", i) +} +func (s *Store) GetVolume(i VolumeId) *Volume { + return s.findVolume(i) +} + +func (s *Store) HasVolume(i VolumeId) bool { + v := s.findVolume(i) + return v != nil +} diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go new file mode 100644 index 000000000..03825c159 --- /dev/null +++ b/weed/storage/store_vacuum.go @@ -0,0 +1,44 @@ +package storage + +import ( + "fmt" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString), false + } + garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) + if e != nil { + return fmt.Errorf("garbageThreshold %s is not a valid float number", garbageThresholdString), false + } + if v := s.findVolume(vid); v != nil { + glog.V(3).Infoln(vid, "garbage level is", v.garbageLevel()) + return nil, garbageThreshold < v.garbageLevel() + } + return fmt.Errorf("volume id %d is not found during check compact", vid), false +} +func (s *Store) CompactVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + } + if v := s.findVolume(vid); v != nil { + return v.Compact() + } + return fmt.Errorf("volume id %d is not found during compact", vid) +} +func (s *Store) CommitCompactVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + } + if v := s.findVolume(vid); v != nil { + return v.commitCompact() + } + return fmt.Errorf("volume id %d is not found during commit compact", vid) +} diff --git a/weed/storage/volume.go b/weed/storage/volume.go new file mode 100644 index 000000000..d40bdc565 --- /dev/null +++ b/weed/storage/volume.go @@ -0,0 +1,430 @@ +package storage + +import ( + "bytes" + "errors" + "fmt" + "io" + "os" + "path" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type Volume struct { + Id VolumeId + dir string + Collection string + dataFile *os.File + nm NeedleMapper + needleMapKind NeedleMapType + readOnly bool + + SuperBlock + + dataFileAccessLock sync.Mutex + lastModifiedTime uint64 //unix time in seconds +} + +func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { + v = &Volume{dir: dirname, Collection: collection, Id: id} + v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} + v.needleMapKind = needleMapKind + e = v.load(true, true, needleMapKind) + return +} +func (v *Volume) String() string { + return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) +} + +func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { + v = &Volume{dir: dirname, Collection: collection, Id: id} + v.SuperBlock = SuperBlock{} + v.needleMapKind = needleMapKind + e = v.load(false, false, needleMapKind) + return +} +func (v *Volume) FileName() (fileName string) { + if v.Collection == "" { + fileName = path.Join(v.dir, v.Id.String()) + } else { + fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String()) + } + return +} +func (v *Volume) DataFile() *os.File { + return v.dataFile +} +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error { + var e error + fileName := v.FileName() + + if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists { + if !canRead { + return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) + } + if canWrite { + v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.lastModifiedTime = uint64(modifiedTime.Unix()) + } else { + glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") + v.dataFile, e = os.Open(fileName + ".dat") + v.readOnly = true + } + } else { + if createDatIfMissing { + v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + } else { + return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) + } + } + + if e != nil { + if !os.IsPermission(e) { + return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e) + } + } + + if v.ReplicaPlacement == nil { + e = v.readSuperBlock() + } else { + e = v.maybeWriteSuperBlock() + } + if e == nil && alsoLoadIndex { + var indexFile *os.File + if v.readOnly { + glog.V(1).Infoln("open to read file", fileName+".idx") + if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil { + return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e) + } + } else { + glog.V(1).Infoln("open to write file", fileName+".idx") + if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil { + return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) + } + } + switch needleMapKind { + case NeedleMapInMemory: + glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) + if v.nm, e = LoadNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) + } + case NeedleMapLevelDb: + glog.V(0).Infoln("loading leveldb file", fileName+".ldb") + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) + } + case NeedleMapBoltDb: + glog.V(0).Infoln("loading boltdb file", fileName+".bdb") + if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil { + glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e) + } + } + } + return e +} +func (v *Volume) Version() Version { + return v.SuperBlock.Version() +} +func (v *Volume) Size() int64 { + stat, e := v.dataFile.Stat() + if e == nil { + return stat.Size() + } + glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) + return -1 +} + +// Close cleanly shuts down this volume +func (v *Volume) Close() { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + v.nm.Close() + _ = v.dataFile.Close() +} + +func (v *Volume) NeedToReplicate() bool { + return v.ReplicaPlacement.GetCopyCount() > 1 +} + +// isFileUnchanged checks whether this needle to write is same as last one. +// It requires serialized access in the same volume. +func (v *Volume) isFileUnchanged(n *Needle) bool { + if v.Ttl.String() != "" { + return false + } + nv, ok := v.nm.Get(n.Id) + if ok && nv.Offset > 0 { + oldNeedle := new(Needle) + err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if err != nil { + glog.V(0).Infof("Failed to check updated file %v", err) + return false + } + defer oldNeedle.ReleaseMemory() + if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { + n.DataSize = oldNeedle.DataSize + return true + } + } + return false +} + +// Destroy removes everything related to this volume +func (v *Volume) Destroy() (err error) { + if v.readOnly { + err = fmt.Errorf("%s is read-only", v.dataFile.Name()) + return + } + v.Close() + err = os.Remove(v.dataFile.Name()) + if err != nil { + return + } + err = v.nm.Destroy() + return +} + +// AppendBlob append a blob to end of the data file, used in replication +func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { + if v.readOnly { + err = fmt.Errorf("%s is read-only", v.dataFile.Name()) + return + } + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if offset, err = v.dataFile.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + v.dataFile.Write(b) + return +} + +func (v *Volume) write(n *Needle) (size uint32, err error) { + glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String()) + if v.readOnly { + err = fmt.Errorf("%s is read-only", v.dataFile.Name()) + return + } + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if v.isFileUnchanged(n) { + size = n.DataSize + glog.V(4).Infof("needle is unchanged!") + return + } + var offset int64 + if offset, err = v.dataFile.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + + if size, err = n.Append(v.dataFile, v.Version()); err != nil { + if e := v.dataFile.Truncate(offset); e != nil { + err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e) + } + return + } + nv, ok := v.nm.Get(n.Id) + if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { + if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) + } + } + if v.lastModifiedTime < n.LastModified { + v.lastModifiedTime = n.LastModified + } + return +} + +func (v *Volume) delete(n *Needle) (uint32, error) { + glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String()) + if v.readOnly { + return 0, fmt.Errorf("%s is read-only", v.dataFile.Name()) + } + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + nv, ok := v.nm.Get(n.Id) + //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) + if ok { + size := nv.Size + if err := v.nm.Delete(n.Id); err != nil { + return size, err + } + if _, err := v.dataFile.Seek(0, 2); err != nil { + return size, err + } + n.Data = nil + _, err := n.Append(v.dataFile, v.Version()) + return size, err + } + return 0, nil +} + +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedle(n *Needle) (int, error) { + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset == 0 { + return -1, errors.New("Not Found") + } + err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if err != nil { + return 0, err + } + bytesRead := len(n.Data) + if !n.HasTtl() { + return bytesRead, nil + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { + return bytesRead, nil + } + n.ReleaseMemory() + return -1, errors.New("Not Found") +} + +func ScanVolumeFile(dirname string, collection string, id VolumeId, + needleMapKind NeedleMapType, + visitSuperBlock func(SuperBlock) error, + readNeedleBody bool, + visitNeedle func(n *Needle, offset int64) error) (err error) { + var v *Volume + if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { + return fmt.Errorf("Failed to load volume %d: %v", id, err) + } + if err = visitSuperBlock(v.SuperBlock); err != nil { + return fmt.Errorf("Failed to process volume %d super block: %v", id, err) + } + + version := v.Version() + + offset := int64(SuperBlockSize) + n, rest, e := ReadNeedleHeader(v.dataFile, version, offset) + if e != nil { + err = fmt.Errorf("cannot read needle header: %v", e) + return + } + for n != nil { + if readNeedleBody { + if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil { + glog.V(0).Infof("cannot read needle body: %v", err) + //err = fmt.Errorf("cannot read needle body: %v", err) + //return + } + if n.DataSize >= n.Size { + // this should come from a bug reported on #87 and #93 + // fixed in v0.69 + // remove this whole "if" clause later, long after 0.69 + oldRest, oldSize := rest, n.Size + padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) + n.Size = 0 + rest = n.Size + NeedleChecksumSize + padding + if rest%NeedlePaddingSize != 0 { + rest += (NeedlePaddingSize - rest%NeedlePaddingSize) + } + glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n) + } + } + if err = visitNeedle(n, offset); err != nil { + glog.V(0).Infof("visit needle error: %v", err) + } + offset += int64(NeedleHeaderSize) + int64(rest) + glog.V(4).Infof("==> new entry offset %d", offset) + if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("cannot read needle header: %v", err) + } + glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) + } + + return +} + +func (v *Volume) ContentSize() uint64 { + return v.nm.ContentSize() +} + +func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) { + exists = true + fi, err := os.Stat(filename) + if os.IsNotExist(err) { + exists = false + return + } + if fi.Mode()&0400 != 0 { + canRead = true + } + if fi.Mode()&0200 != 0 { + canWrite = true + } + modTime = fi.ModTime() + return +} + +// volume is expired if modified time + volume ttl < now +// except when volume is empty +// or when the volume does not have a ttl +// or when volumeSizeLimit is 0 when server just starts +func (v *Volume) expired(volumeSizeLimit uint64) bool { + if volumeSizeLimit == 0 { + //skip if we don't know size limit + return false + } + if v.ContentSize() == 0 { + return false + } + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) + livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 + glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + if int64(v.Ttl.Minutes()) < livedMinutes { + return true + } + return false +} + +// wait either maxDelayMinutes or 10% of ttl minutes +func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + removalDelay := v.Ttl.Minutes() / 10 + if removalDelay > maxDelayMinutes { + removalDelay = maxDelayMinutes + } + + if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { + return true + } + return false +} diff --git a/weed/storage/volume_id.go b/weed/storage/volume_id.go new file mode 100644 index 000000000..0333c6cf0 --- /dev/null +++ b/weed/storage/volume_id.go @@ -0,0 +1,18 @@ +package storage + +import ( + "strconv" +) + +type VolumeId uint32 + +func NewVolumeId(vid string) (VolumeId, error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err +} +func (vid *VolumeId) String() string { + return strconv.FormatUint(uint64(*vid), 10) +} +func (vid *VolumeId) Next() VolumeId { + return VolumeId(uint32(*vid) + 1) +} diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go new file mode 100644 index 000000000..b3068eec3 --- /dev/null +++ b/weed/storage/volume_info.go @@ -0,0 +1,65 @@ +package storage + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" + "sort" +) + +type VolumeInfo struct { + Id VolumeId + Size uint64 + ReplicaPlacement *ReplicaPlacement + Ttl *TTL + Collection string + Version Version + FileCount int + DeleteCount int + DeletedByteCount uint64 + ReadOnly bool +} + +func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) { + vi = VolumeInfo{ + Id: VolumeId(*m.Id), + Size: *m.Size, + Collection: *m.Collection, + FileCount: int(*m.FileCount), + DeleteCount: int(*m.DeleteCount), + DeletedByteCount: *m.DeletedByteCount, + ReadOnly: *m.ReadOnly, + Version: Version(*m.Version), + } + rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement)) + if e != nil { + return vi, e + } + vi.ReplicaPlacement = rp + vi.Ttl = LoadTTLFromUint32(*m.Ttl) + return vi, nil +} + +func (vi VolumeInfo) String() string { + return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", + vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) +} + +/*VolumesInfo sorting*/ + +type volumeInfos []*VolumeInfo + +func (vis volumeInfos) Len() int { + return len(vis) +} + +func (vis volumeInfos) Less(i, j int) bool { + return vis[i].Id < vis[j].Id +} + +func (vis volumeInfos) Swap(i, j int) { + vis[i], vis[j] = vis[j], vis[i] +} + +func sortVolumeInfos(vis volumeInfos) { + sort.Sort(vis) +} diff --git a/weed/storage/volume_info_test.go b/weed/storage/volume_info_test.go new file mode 100644 index 000000000..9a9c43ad2 --- /dev/null +++ b/weed/storage/volume_info_test.go @@ -0,0 +1,23 @@ +package storage + +import "testing" + +func TestSortVolumeInfos(t *testing.T) { + vis := []*VolumeInfo{ + &VolumeInfo{ + Id: 2, + }, + &VolumeInfo{ + Id: 1, + }, + &VolumeInfo{ + Id: 3, + }, + } + sortVolumeInfos(vis) + for i := 0; i < len(vis); i++ { + if vis[i].Id != VolumeId(i+1) { + t.Fatal() + } + } +} diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go new file mode 100644 index 000000000..fc773273d --- /dev/null +++ b/weed/storage/volume_super_block.go @@ -0,0 +1,81 @@ +package storage + +import ( + "fmt" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + SuperBlockSize = 8 +) + +/* +* Super block currently has 8 bytes allocated for each volume. +* Byte 0: version, 1 or 2 +* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc +* Byte 2 and byte 3: Time to live. See TTL for definition +* Byte 4 and byte 5: The number of times the volume has been compacted. +* Rest bytes: Reserved + */ +type SuperBlock struct { + version Version + ReplicaPlacement *ReplicaPlacement + Ttl *TTL + CompactRevision uint16 +} + +func (s *SuperBlock) Version() Version { + return s.version +} +func (s *SuperBlock) Bytes() []byte { + header := make([]byte, SuperBlockSize) + header[0] = byte(s.version) + header[1] = s.ReplicaPlacement.Byte() + s.Ttl.ToBytes(header[2:4]) + util.Uint16toBytes(header[4:6], s.CompactRevision) + return header +} + +func (v *Volume) maybeWriteSuperBlock() error { + stat, e := v.dataFile.Stat() + if e != nil { + glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile, e) + return e + } + if stat.Size() == 0 { + v.SuperBlock.version = CurrentVersion + _, e = v.dataFile.Write(v.SuperBlock.Bytes()) + if e != nil && os.IsPermission(e) { + //read-only, but zero length - recreate it! + if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { + if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { + v.readOnly = false + } + } + } + } + return e +} +func (v *Volume) readSuperBlock() (err error) { + if _, err = v.dataFile.Seek(0, 0); err != nil { + return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err) + } + header := make([]byte, SuperBlockSize) + if _, e := v.dataFile.Read(header); e != nil { + return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e) + } + v.SuperBlock, err = ParseSuperBlock(header) + return err +} +func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { + superBlock.version = Version(header[0]) + if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { + err = fmt.Errorf("cannot read replica type: %s", err.Error()) + } + superBlock.Ttl = LoadTTLFromBytes(header[2:4]) + superBlock.CompactRevision = util.BytesToUint16(header[4:6]) + return +} diff --git a/weed/storage/volume_super_block_test.go b/weed/storage/volume_super_block_test.go new file mode 100644 index 000000000..13db4b194 --- /dev/null +++ b/weed/storage/volume_super_block_test.go @@ -0,0 +1,23 @@ +package storage + +import ( + "testing" +) + +func TestSuperBlockReadWrite(t *testing.T) { + rp, _ := NewReplicaPlacementFromByte(byte(001)) + ttl, _ := ReadTTL("15d") + s := &SuperBlock{ + version: CurrentVersion, + ReplicaPlacement: rp, + Ttl: ttl, + } + + bytes := s.Bytes() + + if !(bytes[2] == 15 && bytes[3] == Day) { + println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) + t.Fail() + } + +} diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go new file mode 100644 index 000000000..231ff31c2 --- /dev/null +++ b/weed/storage/volume_sync.go @@ -0,0 +1,213 @@ +package storage + +import ( + "fmt" + "io" + "io/ioutil" + "net/url" + "os" + "sort" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// The volume sync with a master volume via 2 steps: +// 1. The slave checks master side to find subscription checkpoint +// to setup the replication. +// 2. The slave receives the updates from master + +/* +Assume the slave volume needs to follow the master volume. + +The master volume could be compacted, and could be many files ahead of +slave volume. + +Step 1: +The slave volume will ask the master volume for a snapshot +of (existing file entries, last offset, number of compacted times). + +For each entry x in master existing file entries: + if x does not exist locally: + add x locally + +For each entry y in local slave existing file entries: + if y does not exist on master: + delete y locally + +Step 2: +After this, use the last offset and number of compacted times to request +the master volume to send a new file, and keep looping. If the number of +compacted times is changed, go back to step 1 (very likely this can be +optimized more later). + +*/ + +func (v *Volume) Synchronize(volumeServer string) (err error) { + var lastCompactRevision uint16 = 0 + var compactRevision uint16 = 0 + var masterMap CompactMap + for i := 0; i < 3; i++ { + if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { + return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) + } + if lastCompactRevision != compactRevision && lastCompactRevision != 0 { + if err = v.Compact(); err != nil { + return fmt.Errorf("Compact Volume before synchronizing %v", err) + } + if err = v.commitCompact(); err != nil { + return fmt.Errorf("Commit Compact before synchronizing %v", err) + } + } + lastCompactRevision = compactRevision + if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil { + return + } + } + return +} + +type ByOffset []NeedleValue + +func (a ByOffset) Len() int { return len(a) } +func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } + +// trySynchronizing sync with remote volume server incrementally by +// make up the local and remote delta. +func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error { + slaveIdxFile, err := os.Open(v.nm.IndexFileName()) + if err != nil { + return fmt.Errorf("Open volume %d index file: %v", v.Id, err) + } + defer slaveIdxFile.Close() + slaveMap, err := LoadNeedleMap(slaveIdxFile) + if err != nil { + return fmt.Errorf("Load volume %d index file: %v", v.Id, err) + } + var delta []NeedleValue + if err := masterMap.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok { + return nil // skip intersection + } + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Add master entry: %v", err) + } + if err := slaveMap.m.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := masterMap.Get(needleValue.Key); ok { + return nil // skip intersection + } + needleValue.Size = 0 + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Remove local entry: %v", err) + } + + // simulate to same ordering of remote .dat file needle entries + sort.Sort(ByOffset(delta)) + + // make up the delta + fetchCount := 0 + volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data" + for _, needleValue := range delta { + if needleValue.Size == 0 { + // remove file entry from local + v.removeNeedle(needleValue.Key) + continue + } + // add master file entry to local data file + if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil { + glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err) + return err + } + fetchCount++ + } + glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer) + return nil +} + +func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) { + m = NewCompactMap() + + syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) + if err != nil { + return m, 0, 0, err + } + + total := 0 + err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { + // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) + if offset != 0 && size != 0 { + m.Set(Key(key), offset, size) + } else { + m.Delete(Key(key)) + } + total++ + }) + + glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision) + return m, syncStatus.TailOffset, syncStatus.CompactRevision, err + +} + +func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse { + var syncStatus = operation.SyncVolumeResponse{} + if stat, err := v.dataFile.Stat(); err == nil { + syncStatus.TailOffset = uint64(stat.Size()) + } + syncStatus.IdxFileSize = v.nm.IndexFileSize() + syncStatus.CompactRevision = v.SuperBlock.CompactRevision + syncStatus.Ttl = v.SuperBlock.Ttl.String() + syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() + return syncStatus +} + +func (v *Volume) IndexFileContent() ([]byte, error) { + return v.nm.IndexFileContent() +} + +// removeNeedle removes one needle by needle key +func (v *Volume) removeNeedle(key Key) { + n := new(Needle) + n.Id = uint64(key) + v.delete(n) +} + +// fetchNeedle fetches a remote volume needle by vid, id, offset +// The compact revision is checked first in case the remote volume +// is compacted and the offset is invalid any more. +func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, + needleValue NeedleValue, compactRevision uint16) error { + // add master file entry to local data file + values := make(url.Values) + values.Add("revision", strconv.Itoa(int(compactRevision))) + values.Add("volume", v.Id.String()) + values.Add("id", needleValue.Key.String()) + values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10)) + values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10)) + glog.V(4).Infof("Fetch %+v", needleValue) + return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) + } + offset, err := v.AppendBlob(b) + if err != nil { + return fmt.Errorf("Appending volume %d error: %v", v.Id, err) + } + // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size) + v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size) + return nil + }) +} diff --git a/weed/storage/volume_ttl.go b/weed/storage/volume_ttl.go new file mode 100644 index 000000000..4318bb048 --- /dev/null +++ b/weed/storage/volume_ttl.go @@ -0,0 +1,135 @@ +package storage + +import ( + "strconv" +) + +const ( + //stored unit types + Empty byte = iota + Minute + Hour + Day + Week + Month + Year +) + +type TTL struct { + count byte + unit byte +} + +var EMPTY_TTL = &TTL{} + +// translate a readable ttl to internal ttl +// Supports format example: +// 3m: 3 minutes +// 4h: 4 hours +// 5d: 5 days +// 6w: 6 weeks +// 7M: 7 months +// 8y: 8 years +func ReadTTL(ttlString string) (*TTL, error) { + if ttlString == "" { + return EMPTY_TTL, nil + } + ttlBytes := []byte(ttlString) + unitByte := ttlBytes[len(ttlBytes)-1] + countBytes := ttlBytes[0 : len(ttlBytes)-1] + if '0' <= unitByte && unitByte <= '9' { + countBytes = ttlBytes + unitByte = 'm' + } + count, err := strconv.Atoi(string(countBytes)) + unit := toStoredByte(unitByte) + return &TTL{count: byte(count), unit: unit}, err +} + +// read stored bytes to a ttl +func LoadTTLFromBytes(input []byte) (t *TTL) { + return &TTL{count: input[0], unit: input[1]} +} + +// read stored bytes to a ttl +func LoadTTLFromUint32(ttl uint32) (t *TTL) { + input := make([]byte, 2) + input[1] = byte(ttl) + input[0] = byte(ttl >> 8) + return LoadTTLFromBytes(input) +} + +// save stored bytes to an output with 2 bytes +func (t *TTL) ToBytes(output []byte) { + output[0] = t.count + output[1] = t.unit +} + +func (t *TTL) ToUint32() (output uint32) { + output = uint32(t.count) << 8 + output += uint32(t.unit) + return output +} + +func (t *TTL) String() string { + if t == nil || t.count == 0 { + return "" + } + if t.unit == Empty { + return "" + } + countString := strconv.Itoa(int(t.count)) + switch t.unit { + case Minute: + return countString + "m" + case Hour: + return countString + "h" + case Day: + return countString + "d" + case Week: + return countString + "w" + case Month: + return countString + "M" + case Year: + return countString + "y" + } + return "" +} + +func toStoredByte(readableUnitByte byte) byte { + switch readableUnitByte { + case 'm': + return Minute + case 'h': + return Hour + case 'd': + return Day + case 'w': + return Week + case 'M': + return Month + case 'y': + return Year + } + return 0 +} + +func (t TTL) Minutes() uint32 { + switch t.unit { + case Empty: + return 0 + case Minute: + return uint32(t.count) + case Hour: + return uint32(t.count) * 60 + case Day: + return uint32(t.count) * 60 * 24 + case Week: + return uint32(t.count) * 60 * 24 * 7 + case Month: + return uint32(t.count) * 60 * 24 * 31 + case Year: + return uint32(t.count) * 60 * 24 * 365 + } + return 0 +} diff --git a/weed/storage/volume_ttl_test.go b/weed/storage/volume_ttl_test.go new file mode 100644 index 000000000..216469a4c --- /dev/null +++ b/weed/storage/volume_ttl_test.go @@ -0,0 +1,60 @@ +package storage + +import ( + "testing" +) + +func TestTTLReadWrite(t *testing.T) { + ttl, _ := ReadTTL("") + if ttl.Minutes() != 0 { + t.Errorf("empty ttl:%v", ttl) + } + + ttl, _ = ReadTTL("9") + if ttl.Minutes() != 9 { + t.Errorf("9 ttl:%v", ttl) + } + + ttl, _ = ReadTTL("8m") + if ttl.Minutes() != 8 { + t.Errorf("8m ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5h") + if ttl.Minutes() != 300 { + t.Errorf("5h ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5d") + if ttl.Minutes() != 5*24*60 { + t.Errorf("5d ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5w") + if ttl.Minutes() != 5*7*24*60 { + t.Errorf("5w ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5M") + if ttl.Minutes() != 5*31*24*60 { + t.Errorf("5M ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5y") + if ttl.Minutes() != 5*365*24*60 { + t.Errorf("5y ttl:%v", ttl) + } + + output := make([]byte, 2) + ttl.ToBytes(output) + ttl2 := LoadTTLFromBytes(output) + if ttl.Minutes() != ttl2.Minutes() { + t.Errorf("ttl:%v ttl2:%v", ttl, ttl2) + } + + ttl3 := LoadTTLFromUint32(ttl.ToUint32()) + if ttl.Minutes() != ttl3.Minutes() { + t.Errorf("ttl:%v ttl3:%v", ttl, ttl3) + } + +} diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go new file mode 100644 index 000000000..9b9a27816 --- /dev/null +++ b/weed/storage/volume_vacuum.go @@ -0,0 +1,93 @@ +package storage + +import ( + "fmt" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func (v *Volume) garbageLevel() float64 { + return float64(v.nm.DeletedSize()) / float64(v.ContentSize()) +} + +func (v *Volume) Compact() error { + glog.V(3).Infof("Compacting ...") + //no need to lock for copy on write + //v.accessLock.Lock() + //defer v.accessLock.Unlock() + //glog.V(3).Infof("Got Compaction lock...") + + filePath := v.FileName() + glog.V(3).Infof("creating copies for volume %d ...", v.Id) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") +} +func (v *Volume) commitCompact() error { + glog.V(3).Infof("Committing vacuuming...") + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + glog.V(3).Infof("Got Committing lock...") + v.nm.Close() + _ = v.dataFile.Close() + var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { + return e + } + if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { + return e + } + //glog.V(3).Infof("Pretending to be vacuuming...") + //time.Sleep(20 * time.Second) + glog.V(3).Infof("Loading Commit file...") + if e = v.load(true, false, v.needleMapKind); e != nil { + return e + } + return nil +} + +func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + return + } + defer idx.Close() + + nm := NewNeedleMap(idx) + new_offset := int64(SuperBlockSize) + + now := uint64(time.Now().Unix()) + + err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, + func(superBlock SuperBlock) error { + superBlock.CompactRevision++ + _, err = dst.Write(superBlock.Bytes()) + return err + }, true, func(n *Needle, offset int64) error { + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil + } + nv, ok := v.nm.Get(n.Id) + glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) + if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) + } + if _, err = n.Append(dst, v.Version()); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + new_offset += n.DiskSize() + glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size) + } + return nil + }) + + return +} diff --git a/weed/storage/volume_version.go b/weed/storage/volume_version.go new file mode 100644 index 000000000..2e9f58aa2 --- /dev/null +++ b/weed/storage/volume_version.go @@ -0,0 +1,9 @@ +package storage + +type Version uint8 + +const ( + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 +) |
