diff options
Diffstat (limited to 'go/storage')
| -rw-r--r-- | go/storage/needle_map.go | 77 | ||||
| -rw-r--r-- | go/storage/needle_map_boltdb.go | 26 | ||||
| -rw-r--r-- | go/storage/needle_map_leveldb.go | 26 | ||||
| -rw-r--r-- | go/storage/needle_map_memory.go | 34 | ||||
| -rw-r--r-- | go/storage/needle_read_write.go | 62 | ||||
| -rw-r--r-- | go/storage/store.go | 4 | ||||
| -rw-r--r-- | go/storage/volume.go | 39 | ||||
| -rw-r--r-- | go/storage/volume_sync.go | 213 |
8 files changed, 357 insertions, 124 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 03499fda4..814789616 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -2,7 +2,9 @@ package storage import ( "fmt" + "io/ioutil" "os" + "sync" "github.com/chrislusf/seaweedfs/go/util" ) @@ -26,29 +28,66 @@ type NeedleMapper interface { FileCount() int DeletedCount() int MaxFileKey() uint64 + IndexFileSize() uint64 + IndexFileContent() ([]byte, error) + IndexFileName() string } -type mapMetric struct { - DeletionCounter int `json:"DeletionCounter"` - FileCounter int `json:"FileCounter"` - DeletionByteCounter uint64 `json:"DeletionByteCounter"` - FileByteCounter uint64 `json:"FileByteCounter"` - MaximumFileKey uint64 `json:"MaxFileKey"` +type baseNeedleMapper struct { + indexFile *os.File + indexFileAccessLock sync.Mutex + + mapMetric } -func appendToIndexFile(indexFile *os.File, - key uint64, offset uint32, size uint32) error { +func (nm baseNeedleMapper) IndexFileSize() uint64 { + stat, err := nm.indexFile.Stat() + if err == nil { + return uint64(stat.Size()) + } + return 0 +} + +func (nm baseNeedleMapper) IndexFileName() string { + return nm.indexFile.Name() +} + +func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { + key = util.BytesToUint64(bytes[:8]) + offset = util.BytesToUint32(bytes[8:12]) + size = util.BytesToUint32(bytes[12:16]) + return +} +func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) util.Uint32toBytes(bytes[8:12], offset) util.Uint32toBytes(bytes[12:16], size) - if _, err := indexFile.Seek(0, 2); err != nil { + + nm.indexFileAccessLock.Lock() + defer nm.indexFileAccessLock.Unlock() + if _, err := nm.indexFile.Seek(0, 2); err != nil { return fmt.Errorf("cannot seek end of indexfile %s: %v", - indexFile.Name(), err) + nm.indexFile.Name(), err) } - _, err := indexFile.Write(bytes) + _, err := nm.indexFile.Write(bytes) return err } +func (nm baseNeedleMapper) IndexFileContent() ([]byte, error) { + nm.indexFileAccessLock.Lock() + defer nm.indexFileAccessLock.Unlock() + return ioutil.ReadFile(nm.indexFile.Name()) +} + +type mapMetric struct { + indexFile *os.File + + DeletionCounter int `json:"DeletionCounter"` + FileCounter int `json:"FileCounter"` + DeletionByteCounter uint64 `json:"DeletionByteCounter"` + FileByteCounter uint64 `json:"FileByteCounter"` + MaximumFileKey uint64 `json:"MaxFileKey"` +} func (mm *mapMetric) logDelete(deletedByteCount uint32) { mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) @@ -66,3 +105,19 @@ func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) } } + +func (mm mapMetric) ContentSize() uint64 { + return mm.FileByteCounter +} +func (mm mapMetric) DeletedSize() uint64 { + return mm.DeletionByteCounter +} +func (mm mapMetric) FileCount() int { + return mm.FileCounter +} +func (mm mapMetric) DeletedCount() int { + return mm.DeletionCounter +} +func (mm mapMetric) MaxFileKey() uint64 { + return mm.MaximumFileKey +} diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go index bed6e58d1..e95c016bb 100644 --- a/go/storage/needle_map_boltdb.go +++ b/go/storage/needle_map_boltdb.go @@ -12,15 +12,15 @@ import ( type BoltDbNeedleMap struct { dbFileName string - indexFile *os.File db *bolt.DB - mapMetric + baseNeedleMapper } var boltdbBucket = []byte("weed") func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) { - m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + m = &BoltDbNeedleMap{dbFileName: dbFileName} + m.indexFile = indexFile if !isBoltDbFresh(dbFileName, indexFile) { glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) generateBoltDbFile(dbFileName, indexFile) @@ -101,7 +101,7 @@ func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { } m.logPut(key, oldSize, size) // write to index file first - if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + if err := m.appendToIndexFile(key, offset, size); err != nil { return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) } return boltDbWrite(m.db, key, offset, size) @@ -148,7 +148,7 @@ func (m *BoltDbNeedleMap) Delete(key uint64) error { m.logDelete(oldNeedle.Size) } // write to index file first - if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + if err := m.appendToIndexFile(key, 0, 0); err != nil { return err } return boltDbDelete(m.db, key) @@ -163,19 +163,3 @@ func (m *BoltDbNeedleMap) Destroy() error { os.Remove(m.indexFile.Name()) return os.Remove(m.dbFileName) } - -func (m *BoltDbNeedleMap) ContentSize() uint64 { - return m.FileByteCounter -} -func (m *BoltDbNeedleMap) DeletedSize() uint64 { - return m.DeletionByteCounter -} -func (m *BoltDbNeedleMap) FileCount() int { - return m.FileCounter -} -func (m *BoltDbNeedleMap) DeletedCount() int { - return m.DeletionCounter -} -func (m *BoltDbNeedleMap) MaxFileKey() uint64 { - return m.MaximumFileKey -} diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go index 65377a8a8..47f63e3ae 100644 --- a/go/storage/needle_map_leveldb.go +++ b/go/storage/needle_map_leveldb.go @@ -12,13 +12,13 @@ import ( type LevelDbNeedleMap struct { dbFileName string - indexFile *os.File db *leveldb.DB - mapMetric + baseNeedleMapper } func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { - m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + m = &LevelDbNeedleMap{dbFileName: dbFileName} + m.indexFile = indexFile if !isLevelDbFresh(dbFileName, indexFile) { glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) generateLevelDbFile(dbFileName, indexFile) @@ -89,7 +89,7 @@ func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { } m.logPut(key, oldSize, size) // write to index file first - if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + if err := m.appendToIndexFile(key, offset, size); err != nil { return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) } return levelDbWrite(m.db, key, offset, size) @@ -117,7 +117,7 @@ func (m *LevelDbNeedleMap) Delete(key uint64) error { m.logDelete(oldNeedle.Size) } // write to index file first - if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + if err := m.appendToIndexFile(key, 0, 0); err != nil { return err } return levelDbDelete(m.db, key) @@ -132,19 +132,3 @@ func (m *LevelDbNeedleMap) Destroy() error { os.Remove(m.indexFile.Name()) return os.Remove(m.dbFileName) } - -func (m *LevelDbNeedleMap) ContentSize() uint64 { - return m.FileByteCounter -} -func (m *LevelDbNeedleMap) DeletedSize() uint64 { - return m.DeletionByteCounter -} -func (m *LevelDbNeedleMap) FileCount() int { - return m.FileCounter -} -func (m *LevelDbNeedleMap) DeletedCount() int { - return m.DeletionCounter -} -func (m *LevelDbNeedleMap) MaxFileKey() uint64 { - return m.MaximumFileKey -} diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go index f0c30c9f2..2b1fc1b54 100644 --- a/go/storage/needle_map_memory.go +++ b/go/storage/needle_map_memory.go @@ -5,21 +5,19 @@ import ( "os" "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" ) type NeedleMap struct { - indexFile *os.File - m CompactMap + m CompactMap - mapMetric + baseNeedleMapper } func NewNeedleMap(file *os.File) *NeedleMap { nm := &NeedleMap{ - m: NewCompactMap(), - indexFile: file, + m: NewCompactMap(), } + nm.indexFile = file return nm } @@ -70,9 +68,7 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e for count > 0 && e == nil || e == io.EOF { for i = 0; i+16 <= count; i += 16 { - key = util.BytesToUint64(bytes[i : i+8]) - offset = util.BytesToUint32(bytes[i+8 : i+12]) - size = util.BytesToUint32(bytes[i+12 : i+16]) + key, offset, size = idxFileEntry(bytes[i : i+16]) if e = fn(key, offset, size); e != nil { return e } @@ -90,7 +86,7 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { oldSize := nm.m.Set(Key(key), offset, size) nm.logPut(key, oldSize, size) - return appendToIndexFile(nm.indexFile, key, offset, size) + return nm.appendToIndexFile(key, offset, size) } func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { element, ok = nm.m.Get(Key(key)) @@ -99,7 +95,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { func (nm *NeedleMap) Delete(key uint64) error { deletedBytes := nm.m.Delete(Key(key)) nm.logDelete(deletedBytes) - return appendToIndexFile(nm.indexFile, key, 0, 0) + return nm.appendToIndexFile(key, 0, 0) } func (nm *NeedleMap) Close() { _ = nm.indexFile.Close() @@ -108,19 +104,3 @@ func (nm *NeedleMap) Destroy() error { nm.Close() return os.Remove(nm.indexFile.Name()) } -func (nm NeedleMap) ContentSize() uint64 { - return nm.FileByteCounter -} -func (nm NeedleMap) DeletedSize() uint64 { - return nm.DeletionByteCounter -} -func (nm NeedleMap) FileCount() int { - return nm.FileCounter -} -func (nm NeedleMap) DeletedCount() int { - return nm.DeletionCounter -} - -func (nm NeedleMap) MaxFileKey() uint64 { - return nm.MaximumFileKey -} diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 20d3a093f..9a9f63ddb 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -135,49 +135,37 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func (n *Needle) Read(r *os.File, offset int64, size uint32, version Version) (ret int, err error) { +func ReadNeedleBlob(r *os.File, offset int64, size uint32) (bytes []byte, err error) { + padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) + bytes = make([]byte, NeedleHeaderSize+size+NeedleChecksumSize+padding) + _, err = r.ReadAt(bytes, offset) + return +} + +func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { + bytes, err := ReadNeedleBlob(r, offset, size) + if err != nil { + return err + } + n.ParseNeedleHeader(bytes) + if n.Size != size { + return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) + } switch version { case Version1: - bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) - if ret, err = r.ReadAt(bytes, offset); err != nil { - return - } - n.readNeedleHeader(bytes) n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) - newChecksum := NewCRC(n.Data) - if checksum != newChecksum.Value() { - return 0, errors.New("CRC error! Data On Disk Corrupted") - } - n.Checksum = newChecksum - return case Version2: - if size == 0 { - return 0, nil - } - bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) - if ret, err = r.ReadAt(bytes, offset); err != nil { - return - } - if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) { - return 0, errors.New("File Entry Not Found") - } - n.readNeedleHeader(bytes) - if n.Size != size { - return 0, fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) - } n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize]) - newChecksum := NewCRC(n.Data) - if checksum != newChecksum.Value() { - return 0, errors.New("CRC Found Data On Disk Corrupted") - } - n.Checksum = newChecksum - return } - return 0, fmt.Errorf("Unsupported Version! (%d)", version) + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + newChecksum := NewCRC(n.Data) + if checksum != newChecksum.Value() { + return errors.New("CRC error! Data On Disk Corrupted") + } + n.Checksum = newChecksum + return nil } -func (n *Needle) readNeedleHeader(bytes []byte) { +func (n *Needle) ParseNeedleHeader(bytes []byte) { n.Cookie = util.BytesToUint32(bytes[0:4]) n.Id = util.BytesToUint64(bytes[4:12]) n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) @@ -228,7 +216,7 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bod if count <= 0 || err != nil { return nil, 0, err } - n.readNeedleHeader(bytes) + n.ParseNeedleHeader(bytes) padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) bodyLength = n.Size + NeedleChecksumSize + padding } diff --git a/go/storage/store.go b/go/storage/store.go index 54a6f9c69..425675c8b 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -371,9 +371,9 @@ func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { } return 0, nil } -func (s *Store) Read(i VolumeId, n *Needle) (int, error) { +func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { if v := s.findVolume(i); v != nil { - return v.read(n) + return v.readNeedle(n) } return 0, fmt.Errorf("Volume %v not found!", i) } diff --git a/go/storage/volume.go b/go/storage/volume.go index e4cebea7c..0e6cadecc 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -54,6 +54,9 @@ func (v *Volume) FileName() (fileName string) { } return } +func (v *Volume) DataFile() *os.File { + return v.dataFile +} func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error { var e error fileName := v.FileName() @@ -152,7 +155,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { nv, ok := v.nm.Get(n.Id) if ok && nv.Offset > 0 { oldNeedle := new(Needle) - _, err := oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) if err != nil { glog.V(0).Infof("Failed to check updated file %v", err) return false @@ -180,6 +183,30 @@ func (v *Volume) Destroy() (err error) { return } +// AppendBlob append a blob to end of the data file, used in replication +func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { + if v.readOnly { + err = fmt.Errorf("%s is read-only", v.dataFile.Name()) + return + } + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if offset, err = v.dataFile.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + v.dataFile.Write(b) + return +} + func (v *Volume) write(n *Needle) (size uint32, err error) { glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { @@ -250,17 +277,19 @@ func (v *Volume) delete(n *Needle) (uint32, error) { return 0, nil } -func (v *Volume) read(n *Needle) (int, error) { +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedle(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) if !ok || nv.Offset == 0 { return -1, errors.New("Not Found") } - bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) if err != nil { - return bytesRead, err + return 0, err } + bytesRead := len(n.Data) if !n.HasTtl() { - return bytesRead, err + return bytesRead, nil } ttlMinutes := n.Ttl.Minutes() if ttlMinutes == 0 { diff --git a/go/storage/volume_sync.go b/go/storage/volume_sync.go new file mode 100644 index 000000000..2c72d62f0 --- /dev/null +++ b/go/storage/volume_sync.go @@ -0,0 +1,213 @@ +package storage + +import ( + "fmt" + "io" + "io/ioutil" + "net/url" + "os" + "sort" + "strconv" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/util" +) + +// The volume sync with a master volume via 2 steps: +// 1. The slave checks master side to find subscription checkpoint +// to setup the replication. +// 2. The slave receives the updates from master + +/* +Assume the slave volume needs to follow the master volume. + +The master volume could be compacted, and could be many files ahead of +slave volume. + +Step 1: +The slave volume will ask the master volume for a snapshot +of (existing file entries, last offset, number of compacted times). + +For each entry x in master existing file entries: + if x does not exist locally: + add x locally + +For each entry y in local slave existing file entries: + if y does not exist on master: + delete y locally + +Step 2: +After this, use the last offset and number of compacted times to request +the master volume to send a new file, and keep looping. If the number of +compacted times is changed, go back to step 1 (very likely this can be +optimized more later). + +*/ + +func (v *Volume) Synchronize(volumeServer string) (err error) { + var lastCompactRevision uint16 = 0 + var compactRevision uint16 = 0 + var masterMap CompactMap + for i := 0; i < 3; i++ { + if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { + return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) + } + if lastCompactRevision != compactRevision && lastCompactRevision != 0 { + if err = v.Compact(); err != nil { + return fmt.Errorf("Compact Volume before synchronizing %v", err) + } + if err = v.commitCompact(); err != nil { + return fmt.Errorf("Commit Compact before synchronizing %v", err) + } + } + lastCompactRevision = compactRevision + if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil { + return + } + } + return +} + +type ByOffset []NeedleValue + +func (a ByOffset) Len() int { return len(a) } +func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } + +// trySynchronizing sync with remote volume server incrementally by +// make up the local and remote delta. +func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error { + slaveIdxFile, err := os.Open(v.nm.IndexFileName()) + if err != nil { + return fmt.Errorf("Open volume %d index file: %v", v.Id, err) + } + defer slaveIdxFile.Close() + slaveMap, err := LoadNeedleMap(slaveIdxFile) + if err != nil { + return fmt.Errorf("Load volume %d index file: %v", v.Id, err) + } + var delta []NeedleValue + if err := masterMap.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok { + return nil // skip intersection + } + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Add master entry: %v", err) + } + if err := slaveMap.m.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := masterMap.Get(needleValue.Key); ok { + return nil // skip intersection + } + needleValue.Size = 0 + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Remove local entry: %v", err) + } + + // simulate to same ordering of remote .dat file needle entries + sort.Sort(ByOffset(delta)) + + // make up the delta + fetchCount := 0 + volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data" + for _, needleValue := range delta { + if needleValue.Size == 0 { + // remove file entry from local + v.removeNeedle(needleValue.Key) + continue + } + // add master file entry to local data file + if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil { + glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err) + return err + } + fetchCount++ + } + glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer) + return nil +} + +func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) { + m = NewCompactMap() + + syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) + if err != nil { + return m, 0, 0, err + } + + total := 0 + err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { + // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) + if offset != 0 && size != 0 { + m.Set(Key(key), offset, size) + } else { + m.Delete(Key(key)) + } + total++ + }) + + glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision) + return m, syncStatus.TailOffset, syncStatus.CompactRevision, err + +} + +func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse { + var syncStatus = operation.SyncVolumeResponse{} + if stat, err := v.dataFile.Stat(); err == nil { + syncStatus.TailOffset = uint64(stat.Size()) + } + syncStatus.IdxFileSize = v.nm.IndexFileSize() + syncStatus.CompactRevision = v.SuperBlock.CompactRevision + syncStatus.Ttl = v.SuperBlock.Ttl.String() + syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() + return syncStatus +} + +func (v *Volume) IndexFileContent() ([]byte, error) { + return v.nm.IndexFileContent() +} + +// removeNeedle removes one needle by needle key +func (v *Volume) removeNeedle(key Key) { + n := new(Needle) + n.Id = uint64(key) + v.delete(n) +} + +// fetchNeedle fetches a remote volume needle by vid, id, offset +// The compact revision is checked first in case the remote volume +// is compacted and the offset is invalid any more. +func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, + needleValue NeedleValue, compactRevision uint16) error { + // add master file entry to local data file + values := make(url.Values) + values.Add("revision", strconv.Itoa(int(compactRevision))) + values.Add("volume", v.Id.String()) + values.Add("id", needleValue.Key.String()) + values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10)) + values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10)) + glog.V(4).Infof("Fetch %+v", needleValue) + return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) + } + offset, err := v.AppendBlob(b) + if err != nil { + return fmt.Errorf("Appending volume %d error: %v", v.Id, err) + } + // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size) + v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size) + return nil + }) +} |
