diff options
| author | Chris Lu <chris.lu@gmail.com> | 2013-02-10 03:09:26 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2013-02-10 03:09:26 -0800 |
| commit | cb4e8ec16b5c204718f1efdc1731f1bdd5698ff3 (patch) | |
| tree | 47f27040c3d80e6849932bf2acf54b68c930f72b /src/weed/storage | |
| parent | d3b267bac27018b7f70dfec7c258d0556fff4c14 (diff) | |
| download | seaweedfs-cb4e8ec16b5c204718f1efdc1731f1bdd5698ff3.tar.xz seaweedfs-cb4e8ec16b5c204718f1efdc1731f1bdd5698ff3.zip | |
re-organize code directory structure
Diffstat (limited to 'src/weed/storage')
| -rw-r--r-- | src/weed/storage/compact_map.go | 182 | ||||
| -rw-r--r-- | src/weed/storage/compact_map_perf_test.go | 43 | ||||
| -rw-r--r-- | src/weed/storage/compact_map_test.go | 63 | ||||
| -rw-r--r-- | src/weed/storage/compress.go | 57 | ||||
| -rw-r--r-- | src/weed/storage/crc.go | 21 | ||||
| -rw-r--r-- | src/weed/storage/needle.go | 132 | ||||
| -rw-r--r-- | src/weed/storage/needle_map.go | 99 | ||||
| -rw-r--r-- | src/weed/storage/needle_read_write.go | 238 | ||||
| -rw-r--r-- | src/weed/storage/replication_type.go | 123 | ||||
| -rw-r--r-- | src/weed/storage/sample.idx | bin | 0 -> 27140560 bytes | |||
| -rw-r--r-- | src/weed/storage/store.go | 204 | ||||
| -rw-r--r-- | src/weed/storage/volume.go | 274 | ||||
| -rw-r--r-- | src/weed/storage/volume_id.go | 18 | ||||
| -rw-r--r-- | src/weed/storage/volume_info.go | 13 | ||||
| -rw-r--r-- | src/weed/storage/volume_version.go | 11 |
15 files changed, 1478 insertions, 0 deletions
diff --git a/src/weed/storage/compact_map.go b/src/weed/storage/compact_map.go new file mode 100644 index 000000000..0b33961c4 --- /dev/null +++ b/src/weed/storage/compact_map.go @@ -0,0 +1,182 @@ +package storage + +import () + +type NeedleValue struct { + Key Key + Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 "Size of the data portion" +} + +const ( + batch = 100000 +) + +type Key uint64 + +type CompactSection struct { + values []NeedleValue + overflow map[Key]NeedleValue + start Key + end Key + counter int +} + +func NewCompactSection(start Key) CompactSection { + return CompactSection{ + values: make([]NeedleValue, batch), + overflow: make(map[Key]NeedleValue), + start: start, + } +} + +//return old entry size +func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { + ret := uint32(0) + if key > cs.end { + cs.end = key + } + if i := cs.binarySearchValues(key); i >= 0 { + ret = cs.values[i].Size + //println("key", key, "old size", ret) + cs.values[i].Offset, cs.values[i].Size = offset, size + } else { + needOverflow := cs.counter >= batch + needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key + if needOverflow { + //println("start", cs.start, "counter", cs.counter, "key", key) + if oldValue, found := cs.overflow[key]; found { + ret = oldValue.Size + } + cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size} + } else { + p := &cs.values[cs.counter] + p.Key, p.Offset, p.Size = key, offset, size + //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) + cs.counter++ + } + } + return ret +} + +//return old entry size +func (cs *CompactSection) Delete(key Key) uint32 { + ret := uint32(0) + if i := cs.binarySearchValues(key); i >= 0 { + if cs.values[i].Size > 0 { + ret = cs.values[i].Size + cs.values[i].Size = 0 + } + } + if v, found := cs.overflow[key]; found { + delete(cs.overflow, key) + ret = v.Size + } + return ret +} +func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { + if v, ok := cs.overflow[key]; ok { + return &v, true + } + if i := cs.binarySearchValues(key); i >= 0 { + return &cs.values[i], true + } + return nil, false +} +func (cs *CompactSection) binarySearchValues(key Key) int { + l, h := 0, cs.counter-1 + if h >= 0 && cs.values[h].Key < key { + return -2 + } + //println("looking for key", key) + for l <= h { + m := (l + h) / 2 + //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size) + if cs.values[m].Key < key { + l = m + 1 + } else if key < cs.values[m].Key { + h = m - 1 + } else { + //println("found", m) + return m + } + } + return -1 +} + +//This map assumes mostly inserting increasing keys +type CompactMap struct { + list []CompactSection +} + +func NewCompactMap() CompactMap { + return CompactMap{} +} + +func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { + x := cm.binarySearchCompactSection(key) + if x < 0 { + //println(x, "creating", len(cm.list), "section1, starting", key) + cm.list = append(cm.list, NewCompactSection(key)) + x = len(cm.list) - 1 + } + return cm.list[x].Set(key, offset, size) +} +func (cm *CompactMap) Delete(key Key) uint32 { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return uint32(0) + } + return cm.list[x].Delete(key) +} +func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return nil, false + } + return cm.list[x].Get(key) +} +func (cm *CompactMap) binarySearchCompactSection(key Key) int { + l, h := 0, len(cm.list)-1 + if h < 0 { + return -5 + } + if cm.list[h].start <= key { + if cm.list[h].counter < batch || key <= cm.list[h].end { + return h + } else { + return -4 + } + } + for l <= h { + m := (l + h) / 2 + if key < cm.list[m].start { + h = m - 1 + } else { // cm.list[m].start <= key + if cm.list[m+1].start <= key { + l = m + 1 + } else { + return m + } + } + } + return -3 +} + +func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { + for _, cs := range cm.list { + for _, v := range cs.overflow { + if err := visit(v); err != nil { + return err + } + } + for _, v := range cs.values { + if _, found := cs.overflow[v.Key]; !found { + if err := visit(v); err != nil { + return err + } + } + } + } + return nil +} diff --git a/src/weed/storage/compact_map_perf_test.go b/src/weed/storage/compact_map_perf_test.go new file mode 100644 index 000000000..b99356a73 --- /dev/null +++ b/src/weed/storage/compact_map_perf_test.go @@ -0,0 +1,43 @@ +package storage + +import ( + "log" + "os" + "weed/util" + "testing" +) + +func TestMemoryUsage(t *testing.T) { + + indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + +} + +func LoadNewNeedleMap(file *os.File) CompactMap { + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } + + count, e = file.Read(bytes) + } + return m +} diff --git a/src/weed/storage/compact_map_test.go b/src/weed/storage/compact_map_test.go new file mode 100644 index 000000000..e76e9578d --- /dev/null +++ b/src/weed/storage/compact_map_test.go @@ -0,0 +1,63 @@ +package storage + +import ( + "testing" +) + +func TestXYZ(t *testing.T) { + m := NewCompactMap() + for i := uint32(0); i < 100*batch; i += 2 { + m.Set(Key(i), i, i) + } + + for i := uint32(0); i < 100*batch; i += 37 { + m.Delete(Key(i)) + } + + for i := uint32(0); i < 10*batch; i += 3 { + m.Set(Key(i), i+11, i+5) + } + + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // println(i, "=", v.Key, v.Offset, v.Size) + // } + // } + + for i := uint32(0); i < 10*batch; i++ { + v, ok := m.Get(Key(i)) + if i%3 == 0 { + if !ok { + t.Fatal("key", i, "missing!") + } + if v.Size != i+5 { + t.Fatal("key", i, "size", v.Size) + } + } else if i%37 == 0 { + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v.Size != i { + t.Fatal("key", i, "size", v.Size) + } + } + } + + for i := uint32(10 * batch); i < 100*batch; i++ { + v, ok := m.Get(Key(i)) + if i%37 == 0 { + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v == nil { + t.Fatal("key", i, "missing") + } + if v.Size != i { + t.Fatal("key", i, "size", v.Size) + } + } + } + +} diff --git a/src/weed/storage/compress.go b/src/weed/storage/compress.go new file mode 100644 index 000000000..256789c9c --- /dev/null +++ b/src/weed/storage/compress.go @@ -0,0 +1,57 @@ +package storage + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "io/ioutil" + "strings" +) + +/* +* Default more not to gzip since gzip can be done on client side. +*/ +func IsGzippable(ext, mtype string) bool { + if strings.HasPrefix(mtype, "text/") { + return true + } + switch ext { + case ".zip", ".rar", ".gz", ".bz2", ".xz": + return false + case ".pdf", ".txt", ".html", ".css", ".js", ".json": + return true + } + if strings.HasPrefix(mtype, "application/") { + if strings.HasSuffix(mtype, "xml") { + return true + } + if strings.HasSuffix(mtype, "script") { + return true + } + } + return false +} + +func GzipData(input []byte) ([]byte, error) { + buf := new(bytes.Buffer) + w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) + if _, err := w.Write(input); err != nil { + println("error compressing data:", err) + return nil, err + } + if err := w.Close(); err != nil { + println("error closing compressed data:", err) + return nil, err + } + return buf.Bytes(), nil +} +func UnGzipData(input []byte) ([]byte, error) { + buf := bytes.NewBuffer(input) + r, _ := gzip.NewReader(buf) + defer r.Close() + output, err := ioutil.ReadAll(r) + if err != nil { + println("error uncompressing data:", err) + } + return output, err +} diff --git a/src/weed/storage/crc.go b/src/weed/storage/crc.go new file mode 100644 index 000000000..198352e68 --- /dev/null +++ b/src/weed/storage/crc.go @@ -0,0 +1,21 @@ +package storage + +import ( + "hash/crc32" +) + +var table = crc32.MakeTable(crc32.Castagnoli) + +type CRC uint32 + +func NewCRC(b []byte) CRC { + return CRC(0).Update(b) +} + +func (c CRC) Update(b []byte) CRC { + return CRC(crc32.Update(uint32(c), table, b)) +} + +func (c CRC) Value() uint32 { + return uint32(c>>15|c<<17) + 0xa282ead8 +} diff --git a/src/weed/storage/needle.go b/src/weed/storage/needle.go new file mode 100644 index 000000000..23016f98b --- /dev/null +++ b/src/weed/storage/needle.go @@ -0,0 +1,132 @@ +package storage + +import ( + "encoding/hex" + "fmt" + "io/ioutil" + "mime" + "net/http" + "path" + "weed/util" + "strconv" + "strings" +) + +const ( + NeedleHeaderSize = 16 //should never change this + NeedlePaddingSize = 8 + NeedleChecksumSize = 4 +) + +type Needle struct { + Cookie uint32 "random number to mitigate brute force lookups" + Id uint64 "needle id" + Size uint32 "sum of DataSize,Data,NameSize,Name,MimeSize,Mime" + + DataSize uint32 "Data size" //version2 + Data []byte "The actual file data" + Flags byte "boolean flags" //version2 + NameSize uint8 //version2 + Name []byte "maximum 256 characters" //version2 + MimeSize uint8 //version2 + Mime []byte "maximum 256 characters" //version2 + + Checksum CRC "CRC32 to check integrity" + Padding []byte "Aligned to 8 bytes" +} + +func NewNeedle(r *http.Request) (n *Needle, fname string, e error) { + + n = new(Needle) + form, fe := r.MultipartReader() + if fe != nil { + fmt.Println("MultipartReader [ERROR]", fe) + e = fe + return + } + part, fe := form.NextPart() + if fe != nil { + fmt.Println("Reading Multi part [ERROR]", fe) + e = fe + return + } + fname = part.FileName() + fname = path.Base(fname) + data, _ := ioutil.ReadAll(part) + dotIndex := strings.LastIndex(fname, ".") + ext, mtype := "", "" + if dotIndex > 0 { + ext = fname[dotIndex:] + mtype = mime.TypeByExtension(ext) + } + contentType := part.Header.Get("Content-Type") + if contentType != "" && mtype != contentType && len(contentType) < 256 { + n.Mime = []byte(contentType) + n.SetHasMime() + mtype = contentType + } + if IsGzippable(ext, mtype) { + if data, e = GzipData(data); e != nil { + return + } + n.SetGzipped() + } + if ext == ".gz" { + n.SetGzipped() + } + if len(fname) < 256 { + if strings.HasSuffix(fname, ".gz") { + n.Name = []byte(fname[:len(fname)-3]) + } else { + n.Name = []byte(fname) + } + n.SetHasName() + } + + n.Data = data + n.Checksum = NewCRC(data) + + commaSep := strings.LastIndex(r.URL.Path, ",") + dotSep := strings.LastIndex(r.URL.Path, ".") + fid := r.URL.Path[commaSep+1:] + if dotSep > 0 { + fid = r.URL.Path[commaSep+1 : dotSep] + } + + n.ParsePath(fid) + + return +} +func (n *Needle) ParsePath(fid string) { + length := len(fid) + if length <= 8 { + if length > 0 { + println("Invalid fid", fid, "length", length) + } + return + } + delta := "" + deltaIndex := strings.LastIndex(fid, "_") + if deltaIndex > 0 { + fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] + } + n.Id, n.Cookie = ParseKeyHash(fid) + if delta != "" { + d, e := strconv.ParseUint(delta, 10, 64) + if e == nil { + n.Id += d + } + } +} + +func ParseKeyHash(key_hash_string string) (uint64, uint32) { + key_hash_bytes, khe := hex.DecodeString(key_hash_string) + key_hash_len := len(key_hash_bytes) + if khe != nil || key_hash_len <= 4 { + println("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe) + return 0, 0 + } + key := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) + hash := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) + return key, hash +} diff --git a/src/weed/storage/needle_map.go b/src/weed/storage/needle_map.go new file mode 100644 index 000000000..505dd36dd --- /dev/null +++ b/src/weed/storage/needle_map.go @@ -0,0 +1,99 @@ +package storage + +import ( + //"log" + "os" + "weed/util" +) + +type NeedleMap struct { + indexFile *os.File + m CompactMap + + //transient + bytes []byte + + deletionCounter int + fileCounter int + deletionByteCounter uint64 + fileByteCounter uint64 +} + +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: NewCompactMap(), + bytes: make([]byte, 16), + indexFile: file, + } + return nm +} + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) *NeedleMap { + nm := NewNeedleMap(file) + bytes := make([]byte, 16*RowsToRead) + count, e := nm.indexFile.Read(bytes) + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(Key(key)) + //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } + + count, e = nm.indexFile.Read(bytes) + } + return nm +} + +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { + oldSize := nm.m.Set(Key(key), offset, size) + util.Uint64toBytes(nm.bytes[0:8], key) + util.Uint32toBytes(nm.bytes[8:12], offset) + util.Uint32toBytes(nm.bytes[12:16], size) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + return nm.indexFile.Write(nm.bytes) +} +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + element, ok = nm.m.Get(Key(key)) + return +} +func (nm *NeedleMap) Delete(key uint64) { + nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) + util.Uint64toBytes(nm.bytes[0:8], key) + util.Uint32toBytes(nm.bytes[8:12], 0) + util.Uint32toBytes(nm.bytes[12:16], 0) + nm.indexFile.Write(nm.bytes) + nm.deletionCounter++ +} +func (nm *NeedleMap) Close() { + nm.indexFile.Close() +} +func (nm *NeedleMap) ContentSize() uint64 { + return nm.fileByteCounter +} +func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { + return nm.m.Visit(visit) +} diff --git a/src/weed/storage/needle_read_write.go b/src/weed/storage/needle_read_write.go new file mode 100644 index 000000000..1dac6d5c4 --- /dev/null +++ b/src/weed/storage/needle_read_write.go @@ -0,0 +1,238 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "os" + "weed/util" +) + +const ( + FlagGzip = 0x01 + FlagHasName = 0x02 + FlagHasMime = 0x04 +) + +func (n *Needle) DiskSize() uint32 { + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + return NeedleHeaderSize + n.Size + padding + NeedleChecksumSize +} +func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { + if s, ok := w.(io.Seeker); ok { + if end, e := s.Seek(0, 1); e == nil { + defer func(s io.Seeker, off int64) { + if err != nil { + if _, e = s.Seek(off, 0); e != nil { + fmt.Printf("Failed to seek back to %d with error: %s\n", w, off, e) + } + } + }(s, end) + } else { + err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e) + return + } + } + switch version { + case Version1: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.Size = uint32(len(n.Data)) + size = n.Size + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return + case Version2: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) + if n.DataSize > 0 { + n.Size = 4 + n.DataSize + 1 + if n.HasName() { + n.Size = n.Size + 1 + uint32(n.NameSize) + } + if n.HasMime() { + n.Size = n.Size + 1 + uint32(n.MimeSize) + } + } + size = n.DataSize + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if n.DataSize > 0 { + util.Uint32toBytes(header[0:4], n.DataSize) + if _, err = w.Write(header[0:4]); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + util.Uint8toBytes(header[0:1], n.Flags) + if _, err = w.Write(header[0:1]); err != nil { + return + } + } + if n.HasName() { + util.Uint8toBytes(header[0:1], n.NameSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Name); err != nil { + return + } + } + if n.HasMime() { + util.Uint8toBytes(header[0:1], n.MimeSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Mime); err != nil { + return + } + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return n.DataSize, err + } + return 0, fmt.Errorf("Unsupported Version! (%d)", version) +} + +func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err error) { + switch version { + case Version1: + bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) + if ret, err = r.Read(bytes); err != nil { + return + } + n.readNeedleHeader(bytes) + n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + if checksum != NewCRC(n.Data).Value() { + return 0, errors.New("CRC error! Data On Disk Corrupted!") + } + return + case Version2: + if size == 0 { + return 0, nil + } + bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) + if ret, err = r.Read(bytes); err != nil { + return + } + if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) { + return 0, errors.New("File Entry Not Found!") + } + n.readNeedleHeader(bytes) + if n.Size != size { + return 0, fmt.Errorf("File Entry Not Found! Needle %d Memory %d", n.Size, size) + } + n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize]) + if checksum != NewCRC(n.Data).Value() { + return 0, errors.New("CRC error! Data On Disk Corrupted!") + } + return + } + return 0, fmt.Errorf("Unsupported Version! (%d)", version) +} +func (n *Needle) readNeedleHeader(bytes []byte) { + n.Cookie = util.BytesToUint32(bytes[0:4]) + n.Id = util.BytesToUint64(bytes[4:12]) + n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) +} +func (n *Needle) readNeedleDataVersion2(bytes []byte) { + index, lenBytes := 0, len(bytes) + if index < lenBytes { + n.DataSize = util.BytesToUint32(bytes[index : index+4]) + index = index + 4 + n.Data = bytes[index : index+int(n.DataSize)] + index = index + int(n.DataSize) + n.Flags = bytes[index] + index = index + 1 + } + if index < lenBytes && n.HasName() { + n.NameSize = uint8(bytes[index]) + index = index + 1 + n.Name = bytes[index : index+int(n.NameSize)] + index = index + int(n.NameSize) + } + if index < lenBytes && n.HasMime() { + n.MimeSize = uint8(bytes[index]) + index = index + 1 + n.Mime = bytes[index : index+int(n.MimeSize)] + } +} + +func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32, err error) { + n = new(Needle) + if version == Version1 || version == Version2 { + bytes := make([]byte, NeedleHeaderSize) + var count int + count, err = r.Read(bytes) + if count <= 0 || err != nil { + return nil, 0, err + } + n.readNeedleHeader(bytes) + padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) + bodyLength = n.Size + NeedleChecksumSize + padding + } + return +} + +//n should be a needle already read the header +//the input stream will read until next file entry +func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) (err error) { + if bodyLength <= 0 { + return nil + } + switch version { + case Version1: + bytes := make([]byte, bodyLength) + if _, err = r.Read(bytes); err != nil { + return + } + n.Data = bytes[:n.Size] + n.Checksum = NewCRC(n.Data) + case Version2: + bytes := make([]byte, bodyLength) + if _, err = r.Read(bytes); err != nil { + return + } + n.readNeedleDataVersion2(bytes[0:n.Size]) + n.Checksum = NewCRC(n.Data) + default: + err = fmt.Errorf("Unsupported Version! (%d)", version) + } + return +} + +func (n *Needle) IsGzipped() bool { + return n.Flags&FlagGzip > 0 +} +func (n *Needle) SetGzipped() { + n.Flags = n.Flags | FlagGzip +} +func (n *Needle) HasName() bool { + return n.Flags&FlagHasName > 0 +} +func (n *Needle) SetHasName() { + n.Flags = n.Flags | FlagHasName +} +func (n *Needle) HasMime() bool { + return n.Flags&FlagHasMime > 0 +} +func (n *Needle) SetHasMime() { + n.Flags = n.Flags | FlagHasMime +} diff --git a/src/weed/storage/replication_type.go b/src/weed/storage/replication_type.go new file mode 100644 index 000000000..0902d1016 --- /dev/null +++ b/src/weed/storage/replication_type.go @@ -0,0 +1,123 @@ +package storage + +import ( + "errors" +) + +type ReplicationType string + +const ( + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 + CopyNil = ReplicationType(255) // nil value +) + +func NewReplicationTypeFromString(t string) (ReplicationType, error) { + switch t { + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + t) +} +func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + string(b)) +} + +func (r *ReplicationType) String() string { + switch *r { + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" + } + return "000" +} +func (r *ReplicationType) Byte() byte { + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) +} + +func (repType ReplicationType) GetReplicationLevelIndex() int { + switch repType { + case Copy000: + return 0 + case Copy001: + return 1 + case Copy010: + return 2 + case Copy100: + return 3 + case Copy110: + return 4 + case Copy200: + return 5 + } + return -1 +} +func (repType ReplicationType) GetCopyCount() int { + switch repType { + case Copy000: + return 1 + case Copy001: + return 2 + case Copy010: + return 2 + case Copy100: + return 2 + case Copy110: + return 3 + case Copy200: + return 3 + } + return 0 +} diff --git a/src/weed/storage/sample.idx b/src/weed/storage/sample.idx Binary files differnew file mode 100644 index 000000000..44918b41d --- /dev/null +++ b/src/weed/storage/sample.idx diff --git a/src/weed/storage/store.go b/src/weed/storage/store.go new file mode 100644 index 000000000..b016a6491 --- /dev/null +++ b/src/weed/storage/store.go @@ -0,0 +1,204 @@ +package storage + +import ( + "encoding/json" + "errors" + "io/ioutil" + "log" + "net/url" + "weed/util" + "strconv" + "strings" +) + +type Store struct { + volumes map[VolumeId]*Volume + dir string + Port int + Ip string + PublicUrl string + MaxVolumeCount int + + masterNode string + connected bool + volumeSizeLimit uint64 //read from the master + +} + +func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { + s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount} + s.volumes = make(map[VolumeId]*Volume) + s.loadExistingVolumes() + + log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") + return +} +func (s *Store) AddVolume(volumeListString string, replicationType string) error { + rt, e := NewReplicationTypeFromString(replicationType) + if e != nil { + return e + } + for _, range_string := range strings.Split(volumeListString, ",") { + if strings.Index(range_string, "-") < 0 { + id_string := range_string + id, err := NewVolumeId(id_string) + if err != nil { + return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") + } + e = s.addVolume(VolumeId(id), rt) + } else { + pair := strings.Split(range_string, "-") + start, start_err := strconv.ParseUint(pair[0], 10, 64) + if start_err != nil { + return errors.New("Volume Start Id" + pair[0] + " is not a valid unsigned integer!") + } + end, end_err := strconv.ParseUint(pair[1], 10, 64) + if end_err != nil { + return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") + } + for id := start; id <= end; id++ { + if err := s.addVolume(VolumeId(id), rt); err != nil { + e = err + } + } + } + } + return e +} +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { + if s.volumes[vid] != nil { + return errors.New("Volume Id " + vid.String() + " already exists!") + } + log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) + s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) + return err +} + +func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false + } + garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) + if e != nil { + return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false + } + return nil, garbageThreshold < s.volumes[vid].garbageLevel() +} +func (s *Store) CompactVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + } + return s.volumes[vid].compact() +} +func (s *Store) CommitCompactVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + } + return s.volumes[vid].commitCompact() +} +func (s *Store) loadExistingVolumes() { + if dirs, err := ioutil.ReadDir(s.dir); err == nil { + for _, dir := range dirs { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + base := name[:len(name)-len(".dat")] + if vid, err := NewVolumeId(base); err == nil { + if s.volumes[vid] == nil { + if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { + s.volumes[vid] = v + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) + } + } + } + } + } + } +} +func (s *Store) Status() []*VolumeInfo { + var stats []*VolumeInfo + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = + VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + stats = append(stats, s) + } + return stats +} + +type JoinResult struct { + VolumeSizeLimit uint64 +} + +func (s *Store) SetMaster(mserver string) { + s.masterNode = mserver +} +func (s *Store) Join() error { + stats := new([]*VolumeInfo) + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = + VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + *stats = append(*stats, s) + } + bytes, _ := json.Marshal(stats) + values := make(url.Values) + if !s.connected { + values.Add("init", "true") + } + values.Add("port", strconv.Itoa(s.Port)) + values.Add("ip", s.Ip) + values.Add("publicUrl", s.PublicUrl) + values.Add("volumes", string(bytes)) + values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) + jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) + if err != nil { + return err + } + var ret JoinResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + s.volumeSizeLimit = ret.VolumeSizeLimit + s.connected = true + return nil +} +func (s *Store) Close() { + for _, v := range s.volumes { + v.Close() + } +} +func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { + if v := s.volumes[i]; v != nil { + size, err = v.write(n) + if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { + log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) + s.Join() + } + return + } + log.Println("volume", i, "not found!") + return +} +func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { + if v := s.volumes[i]; v != nil { + return v.delete(n) + } + return 0, nil +} +func (s *Store) Read(i VolumeId, n *Needle) (int, error) { + if v := s.volumes[i]; v != nil { + return v.read(n) + } + return 0, errors.New("Not Found") +} +func (s *Store) GetVolume(i VolumeId) *Volume { + return s.volumes[i] +} + +func (s *Store) HasVolume(i VolumeId) bool { + _, ok := s.volumes[i] + return ok +} diff --git a/src/weed/storage/volume.go b/src/weed/storage/volume.go new file mode 100644 index 000000000..707c6e6f8 --- /dev/null +++ b/src/weed/storage/volume.go @@ -0,0 +1,274 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "os" + "path" + "sync" +) + +const ( + SuperBlockSize = 8 +) + +type SuperBlock struct { + Version Version + ReplicaType ReplicationType +} + +func (s *SuperBlock) Bytes() []byte { + header := make([]byte, SuperBlockSize) + header[0] = byte(s.Version) + header[1] = s.ReplicaType.Byte() + return header +} + +type Volume struct { + Id VolumeId + dir string + dataFile *os.File + nm *NeedleMap + + SuperBlock + + accessLock sync.Mutex +} + +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { + v = &Volume{dir: dirname, Id: id} + v.SuperBlock = SuperBlock{ReplicaType: replicationType} + e = v.load(true) + return +} +func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) { + v = &Volume{dir: dirname, Id: id} + v.SuperBlock = SuperBlock{ReplicaType: CopyNil} + e = v.load(false) + return +} +func (v *Volume) load(alsoLoadIndex bool) error { + var e error + fileName := path.Join(v.dir, v.Id.String()) + v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + if v.ReplicaType == CopyNil { + if e = v.readSuperBlock(); e != nil { + return e + } + } else { + v.maybeWriteSuperBlock() + } + if alsoLoadIndex { + indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if ie != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + v.nm = LoadNeedleMap(indexFile) + } + return nil +} +func (v *Volume) Version() Version { + return v.SuperBlock.Version +} +func (v *Volume) Size() int64 { + v.accessLock.Lock() + defer v.accessLock.Unlock() + stat, e := v.dataFile.Stat() + if e == nil { + return stat.Size() + } + fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error()) + return -1 +} +func (v *Volume) Close() { + v.accessLock.Lock() + defer v.accessLock.Unlock() + v.nm.Close() + v.dataFile.Close() +} +func (v *Volume) maybeWriteSuperBlock() { + stat, e := v.dataFile.Stat() + if e != nil { + fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e) + return + } + if stat.Size() == 0 { + v.SuperBlock.Version = CurrentVersion + v.dataFile.Write(v.SuperBlock.Bytes()) + } +} +func (v *Volume) readSuperBlock() (err error) { + v.dataFile.Seek(0, 0) + header := make([]byte, SuperBlockSize) + if _, e := v.dataFile.Read(header); e != nil { + return fmt.Errorf("cannot read superblock: %s", e) + } + v.SuperBlock, err = ParseSuperBlock(header) + return err +} +func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { + superBlock.Version = Version(header[0]) + if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { + err = fmt.Errorf("cannot read replica type: %s", err) + } + return +} +func (v *Volume) NeedToReplicate() bool { + return v.ReplicaType.GetCopyCount() > 1 +} + +func (v *Volume) write(n *Needle) (size uint32, err error) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + var offset int64 + if offset, err = v.dataFile.Seek(0, 2); err != nil { + return + } + if size, err = n.Append(v.dataFile, v.Version()); err != nil { + return + } + nv, ok := v.nm.Get(n.Id) + if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { + _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size) + } + return +} +func (v *Volume) delete(n *Needle) (uint32, error) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + nv, ok := v.nm.Get(n.Id) + //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) + if ok { + v.nm.Delete(n.Id) + v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0) + _, err := n.Append(v.dataFile, v.Version()) + return nv.Size, err + } + return 0, nil +} + +func (v *Volume) read(n *Needle) (int, error) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + nv, ok := v.nm.Get(n.Id) + if ok && nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) + return n.Read(v.dataFile, nv.Size, v.Version()) + } + return -1, errors.New("Not Found") +} + +func (v *Volume) garbageLevel() float64 { + return float64(v.nm.deletionByteCounter) / float64(v.ContentSize()) +} + +func (v *Volume) compact() error { + v.accessLock.Lock() + defer v.accessLock.Unlock() + + filePath := path.Join(v.dir, v.Id.String()) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") +} +func (v *Volume) commitCompact() error { + v.accessLock.Lock() + defer v.accessLock.Unlock() + v.dataFile.Close() + var e error + if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil { + return e + } + if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil { + return e + } + if e = v.load(true); e != nil { + return e + } + return nil +} + +func ScanVolumeFile(dirname string, id VolumeId, + visitSuperBlock func(SuperBlock) error, + visitNeedle func(n *Needle, offset uint32) error) (err error) { + var v *Volume + if v, err = LoadVolumeOnly(dirname, id); err != nil { + return + } + if err = visitSuperBlock(v.SuperBlock); err != nil { + return + } + + version := v.Version() + + offset := uint32(SuperBlockSize) + n, rest, e := ReadNeedleHeader(v.dataFile, version) + if e != nil { + err = fmt.Errorf("cannot read needle header: %s", e) + return + } + for n != nil { + if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil { + err = fmt.Errorf("cannot read needle body: %s", err) + return + } + if err = visitNeedle(n, offset); err != nil { + return + } + offset += NeedleHeaderSize + rest + if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("cannot read needle header: %s", err) + } + } + + return +} + +func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil { + return + } + defer idx.Close() + + nm := NewNeedleMap(idx) + new_offset := uint32(SuperBlockSize) + + err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error { + _, err = dst.Write(superBlock.Bytes()) + return err + }, func(n *Needle, offset uint32) error { + nv, ok := v.nm.Get(n.Id) + //log.Println("file size is", n.Size, "rest", rest) + if ok && nv.Offset*NeedlePaddingSize == offset { + if nv.Size > 0 { + if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) + } + if _, err = n.Append(dst, v.Version()); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + new_offset += n.DiskSize() + //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest) + } + } + return nil + }) + + return +} +func (v *Volume) ContentSize() uint64 { + return v.nm.fileByteCounter +} diff --git a/src/weed/storage/volume_id.go b/src/weed/storage/volume_id.go new file mode 100644 index 000000000..0333c6cf0 --- /dev/null +++ b/src/weed/storage/volume_id.go @@ -0,0 +1,18 @@ +package storage + +import ( + "strconv" +) + +type VolumeId uint32 + +func NewVolumeId(vid string) (VolumeId, error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err +} +func (vid *VolumeId) String() string { + return strconv.FormatUint(uint64(*vid), 10) +} +func (vid *VolumeId) Next() VolumeId { + return VolumeId(uint32(*vid) + 1) +} diff --git a/src/weed/storage/volume_info.go b/src/weed/storage/volume_info.go new file mode 100644 index 000000000..e4c5f6ec4 --- /dev/null +++ b/src/weed/storage/volume_info.go @@ -0,0 +1,13 @@ +package storage + +import () + +type VolumeInfo struct { + Id VolumeId + Size uint64 + RepType ReplicationType + Version Version + FileCount int + DeleteCount int + DeletedByteCount uint64 +} diff --git a/src/weed/storage/volume_version.go b/src/weed/storage/volume_version.go new file mode 100644 index 000000000..9702ae904 --- /dev/null +++ b/src/weed/storage/volume_version.go @@ -0,0 +1,11 @@ +package storage + +import () + +type Version uint8 + +const ( + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 +) |
