aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_sync.go')
-rw-r--r--weed/storage/volume_sync.go213
1 files changed, 213 insertions, 0 deletions
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go
new file mode 100644
index 000000000..231ff31c2
--- /dev/null
+++ b/weed/storage/volume_sync.go
@@ -0,0 +1,213 @@
+package storage
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/url"
+ "os"
+ "sort"
+ "strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// The volume sync with a master volume via 2 steps:
+// 1. The slave checks master side to find subscription checkpoint
+// to setup the replication.
+// 2. The slave receives the updates from master
+
+/*
+Assume the slave volume needs to follow the master volume.
+
+The master volume could be compacted, and could be many files ahead of
+slave volume.
+
+Step 1:
+The slave volume will ask the master volume for a snapshot
+of (existing file entries, last offset, number of compacted times).
+
+For each entry x in master existing file entries:
+ if x does not exist locally:
+ add x locally
+
+For each entry y in local slave existing file entries:
+ if y does not exist on master:
+ delete y locally
+
+Step 2:
+After this, use the last offset and number of compacted times to request
+the master volume to send a new file, and keep looping. If the number of
+compacted times is changed, go back to step 1 (very likely this can be
+optimized more later).
+
+*/
+
+func (v *Volume) Synchronize(volumeServer string) (err error) {
+ var lastCompactRevision uint16 = 0
+ var compactRevision uint16 = 0
+ var masterMap CompactMap
+ for i := 0; i < 3; i++ {
+ if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
+ return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
+ }
+ if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
+ if err = v.Compact(); err != nil {
+ return fmt.Errorf("Compact Volume before synchronizing %v", err)
+ }
+ if err = v.commitCompact(); err != nil {
+ return fmt.Errorf("Commit Compact before synchronizing %v", err)
+ }
+ }
+ lastCompactRevision = compactRevision
+ if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
+ return
+ }
+ }
+ return
+}
+
+type ByOffset []NeedleValue
+
+func (a ByOffset) Len() int { return len(a) }
+func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
+
+// trySynchronizing sync with remote volume server incrementally by
+// make up the local and remote delta.
+func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error {
+ slaveIdxFile, err := os.Open(v.nm.IndexFileName())
+ if err != nil {
+ return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
+ }
+ defer slaveIdxFile.Close()
+ slaveMap, err := LoadNeedleMap(slaveIdxFile)
+ if err != nil {
+ return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
+ }
+ var delta []NeedleValue
+ if err := masterMap.Visit(func(needleValue NeedleValue) error {
+ if needleValue.Key == 0 {
+ return nil
+ }
+ if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok {
+ return nil // skip intersection
+ }
+ delta = append(delta, needleValue)
+ return nil
+ }); err != nil {
+ return fmt.Errorf("Add master entry: %v", err)
+ }
+ if err := slaveMap.m.Visit(func(needleValue NeedleValue) error {
+ if needleValue.Key == 0 {
+ return nil
+ }
+ if _, ok := masterMap.Get(needleValue.Key); ok {
+ return nil // skip intersection
+ }
+ needleValue.Size = 0
+ delta = append(delta, needleValue)
+ return nil
+ }); err != nil {
+ return fmt.Errorf("Remove local entry: %v", err)
+ }
+
+ // simulate to same ordering of remote .dat file needle entries
+ sort.Sort(ByOffset(delta))
+
+ // make up the delta
+ fetchCount := 0
+ volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data"
+ for _, needleValue := range delta {
+ if needleValue.Size == 0 {
+ // remove file entry from local
+ v.removeNeedle(needleValue.Key)
+ continue
+ }
+ // add master file entry to local data file
+ if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil {
+ glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
+ return err
+ }
+ fetchCount++
+ }
+ glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
+ return nil
+}
+
+func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) {
+ m = NewCompactMap()
+
+ syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
+ if err != nil {
+ return m, 0, 0, err
+ }
+
+ total := 0
+ err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
+ // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
+ if offset != 0 && size != 0 {
+ m.Set(Key(key), offset, size)
+ } else {
+ m.Delete(Key(key))
+ }
+ total++
+ })
+
+ glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
+ return m, syncStatus.TailOffset, syncStatus.CompactRevision, err
+
+}
+
+func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse {
+ var syncStatus = operation.SyncVolumeResponse{}
+ if stat, err := v.dataFile.Stat(); err == nil {
+ syncStatus.TailOffset = uint64(stat.Size())
+ }
+ syncStatus.IdxFileSize = v.nm.IndexFileSize()
+ syncStatus.CompactRevision = v.SuperBlock.CompactRevision
+ syncStatus.Ttl = v.SuperBlock.Ttl.String()
+ syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
+ return syncStatus
+}
+
+func (v *Volume) IndexFileContent() ([]byte, error) {
+ return v.nm.IndexFileContent()
+}
+
+// removeNeedle removes one needle by needle key
+func (v *Volume) removeNeedle(key Key) {
+ n := new(Needle)
+ n.Id = uint64(key)
+ v.delete(n)
+}
+
+// fetchNeedle fetches a remote volume needle by vid, id, offset
+// The compact revision is checked first in case the remote volume
+// is compacted and the offset is invalid any more.
+func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
+ needleValue NeedleValue, compactRevision uint16) error {
+ // add master file entry to local data file
+ values := make(url.Values)
+ values.Add("revision", strconv.Itoa(int(compactRevision)))
+ values.Add("volume", v.Id.String())
+ values.Add("id", needleValue.Key.String())
+ values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10))
+ values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10))
+ glog.V(4).Infof("Fetch %+v", needleValue)
+ return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
+ }
+ offset, err := v.AppendBlob(b)
+ if err != nil {
+ return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
+ }
+ // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
+ v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size)
+ return nil
+ })
+}