diff options
Diffstat (limited to 'weed/storage/volume_sync.go')
| -rw-r--r-- | weed/storage/volume_sync.go | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go new file mode 100644 index 000000000..231ff31c2 --- /dev/null +++ b/weed/storage/volume_sync.go @@ -0,0 +1,213 @@ +package storage + +import ( + "fmt" + "io" + "io/ioutil" + "net/url" + "os" + "sort" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// The volume sync with a master volume via 2 steps: +// 1. The slave checks master side to find subscription checkpoint +// to setup the replication. +// 2. The slave receives the updates from master + +/* +Assume the slave volume needs to follow the master volume. + +The master volume could be compacted, and could be many files ahead of +slave volume. + +Step 1: +The slave volume will ask the master volume for a snapshot +of (existing file entries, last offset, number of compacted times). + +For each entry x in master existing file entries: + if x does not exist locally: + add x locally + +For each entry y in local slave existing file entries: + if y does not exist on master: + delete y locally + +Step 2: +After this, use the last offset and number of compacted times to request +the master volume to send a new file, and keep looping. If the number of +compacted times is changed, go back to step 1 (very likely this can be +optimized more later). + +*/ + +func (v *Volume) Synchronize(volumeServer string) (err error) { + var lastCompactRevision uint16 = 0 + var compactRevision uint16 = 0 + var masterMap CompactMap + for i := 0; i < 3; i++ { + if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { + return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) + } + if lastCompactRevision != compactRevision && lastCompactRevision != 0 { + if err = v.Compact(); err != nil { + return fmt.Errorf("Compact Volume before synchronizing %v", err) + } + if err = v.commitCompact(); err != nil { + return fmt.Errorf("Commit Compact before synchronizing %v", err) + } + } + lastCompactRevision = compactRevision + if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil { + return + } + } + return +} + +type ByOffset []NeedleValue + +func (a ByOffset) Len() int { return len(a) } +func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } + +// trySynchronizing sync with remote volume server incrementally by +// make up the local and remote delta. +func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error { + slaveIdxFile, err := os.Open(v.nm.IndexFileName()) + if err != nil { + return fmt.Errorf("Open volume %d index file: %v", v.Id, err) + } + defer slaveIdxFile.Close() + slaveMap, err := LoadNeedleMap(slaveIdxFile) + if err != nil { + return fmt.Errorf("Load volume %d index file: %v", v.Id, err) + } + var delta []NeedleValue + if err := masterMap.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok { + return nil // skip intersection + } + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Add master entry: %v", err) + } + if err := slaveMap.m.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := masterMap.Get(needleValue.Key); ok { + return nil // skip intersection + } + needleValue.Size = 0 + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Remove local entry: %v", err) + } + + // simulate to same ordering of remote .dat file needle entries + sort.Sort(ByOffset(delta)) + + // make up the delta + fetchCount := 0 + volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data" + for _, needleValue := range delta { + if needleValue.Size == 0 { + // remove file entry from local + v.removeNeedle(needleValue.Key) + continue + } + // add master file entry to local data file + if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil { + glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err) + return err + } + fetchCount++ + } + glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer) + return nil +} + +func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) { + m = NewCompactMap() + + syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) + if err != nil { + return m, 0, 0, err + } + + total := 0 + err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { + // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) + if offset != 0 && size != 0 { + m.Set(Key(key), offset, size) + } else { + m.Delete(Key(key)) + } + total++ + }) + + glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision) + return m, syncStatus.TailOffset, syncStatus.CompactRevision, err + +} + +func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse { + var syncStatus = operation.SyncVolumeResponse{} + if stat, err := v.dataFile.Stat(); err == nil { + syncStatus.TailOffset = uint64(stat.Size()) + } + syncStatus.IdxFileSize = v.nm.IndexFileSize() + syncStatus.CompactRevision = v.SuperBlock.CompactRevision + syncStatus.Ttl = v.SuperBlock.Ttl.String() + syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() + return syncStatus +} + +func (v *Volume) IndexFileContent() ([]byte, error) { + return v.nm.IndexFileContent() +} + +// removeNeedle removes one needle by needle key +func (v *Volume) removeNeedle(key Key) { + n := new(Needle) + n.Id = uint64(key) + v.delete(n) +} + +// fetchNeedle fetches a remote volume needle by vid, id, offset +// The compact revision is checked first in case the remote volume +// is compacted and the offset is invalid any more. +func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, + needleValue NeedleValue, compactRevision uint16) error { + // add master file entry to local data file + values := make(url.Values) + values.Add("revision", strconv.Itoa(int(compactRevision))) + values.Add("volume", v.Id.String()) + values.Add("id", needleValue.Key.String()) + values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10)) + values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10)) + glog.V(4).Infof("Fetch %+v", needleValue) + return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) + } + offset, err := v.AppendBlob(b) + if err != nil { + return fmt.Errorf("Appending volume %d error: %v", v.Id, err) + } + // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size) + v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size) + return nil + }) +} |
