aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_backup.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-17 22:04:49 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-17 22:04:49 -0700
commit13ad5c196656c586e908defce9b6d8e717663625 (patch)
tree4ce98a52e30c52e4319699e0813612e389bb1d6c /weed/storage/volume_backup.go
parent3b9446606a5e74130677dcc7d0411865a4e144fc (diff)
downloadseaweedfs-13ad5c196656c586e908defce9b6d8e717663625.tar.xz
seaweedfs-13ad5c196656c586e908defce9b6d8e717663625.zip
refactoring
Diffstat (limited to 'weed/storage/volume_backup.go')
-rw-r--r--weed/storage/volume_backup.go253
1 files changed, 253 insertions, 0 deletions
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
new file mode 100644
index 000000000..cb478c427
--- /dev/null
+++ b/weed/storage/volume_backup.go
@@ -0,0 +1,253 @@
+package storage
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "google.golang.org/grpc"
+ "io"
+ "os"
+)
+
+func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
+ var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
+ if stat, err := v.dataFile.Stat(); err == nil {
+ syncStatus.TailOffset = uint64(stat.Size())
+ }
+ syncStatus.Collection = v.Collection
+ syncStatus.IdxFileSize = v.nm.IndexFileSize()
+ syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
+ syncStatus.Ttl = v.SuperBlock.Ttl.String()
+ syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
+ return syncStatus
+}
+
+// The volume sync with a master volume via 2 steps:
+// 1. The slave checks master side to find subscription checkpoint
+// to setup the replication.
+// 2. The slave receives the updates from master
+
+/*
+Assume the slave volume needs to follow the master volume.
+
+The master volume could be compacted, and could be many files ahead of
+slave volume.
+
+Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
+0.1 If slave compact version is less than the master, do a local compaction, and set
+local compact version the same as the master.
+0.2 If the slave size is still bigger than the master, discard local copy and do a full copy.
+
+Step 1:
+The slave volume ask the master by the last modification time t.
+The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
+to find the first entry with appendAtNs > t.
+
+Step 2:
+The master send content bytes to the slave. The bytes are not chunked by needle.
+
+Step 3:
+The slave generate the needle map for the new bytes. (This may be optimized to incrementally
+update needle map when receiving new .dat bytes. But seems not necessary now.)
+
+*/
+
+func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
+
+ ctx := context.Background()
+
+ startFromOffset := v.Size()
+ appendAtNs, err := v.findLastAppendAtNs()
+ if err != nil {
+ return err
+ }
+
+ err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
+ VolumeId: uint32(v.Id),
+ Since: appendAtNs,
+ })
+ if err != nil {
+ return err
+ }
+
+ v.dataFile.Seek(startFromOffset, io.SeekStart)
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ _, writeErr := v.dataFile.Write(resp.FileContent)
+ if writeErr != nil {
+ return writeErr
+ }
+ }
+
+ return nil
+
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // add to needle map
+ return ScanVolumeFileFrom(v.version, v.dataFile, startFromOffset, &VolumeFileScanner4GenIdx{v: v})
+
+}
+
+func (v *Volume) findLastAppendAtNs() (uint64, error) {
+ offset, err := v.locateLastAppendEntry()
+ if err != nil {
+ return 0, err
+ }
+ if offset.IsZero() {
+ return 0, nil
+ }
+ return v.readAppendAtNs(offset)
+}
+
+func (v *Volume) locateLastAppendEntry() (Offset, error) {
+ indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
+ if e != nil {
+ return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
+ }
+ defer indexFile.Close()
+
+ fi, err := indexFile.Stat()
+ if err != nil {
+ return Offset{}, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
+ }
+ fileSize := fi.Size()
+ if fileSize%NeedleEntrySize != 0 {
+ return Offset{}, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
+ }
+ if fileSize == 0 {
+ return Offset{}, nil
+ }
+
+ bytes := make([]byte, NeedleEntrySize)
+ n, e := indexFile.ReadAt(bytes, fileSize-NeedleEntrySize)
+ if n != NeedleEntrySize {
+ return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
+ }
+ _, offset, _ := IdxFileEntry(bytes)
+
+ return offset, nil
+}
+
+func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
+
+ n, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset())
+ if err != nil {
+ return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
+ }
+ err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleEntrySize), bodyLength)
+ if err != nil {
+ return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
+ }
+ return n.AppendAtNs, nil
+
+}
+
+// on server side
+func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
+ indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
+ if openErr != nil {
+ err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
+ return
+ }
+ defer indexFile.Close()
+
+ fi, statErr := indexFile.Stat()
+ if statErr != nil {
+ err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
+ return
+ }
+ fileSize := fi.Size()
+ if fileSize%NeedleEntrySize != 0 {
+ err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
+ return
+ }
+
+ bytes := make([]byte, NeedleEntrySize)
+ entryCount := fileSize / NeedleEntrySize
+ l := int64(0)
+ h := entryCount
+
+ for l < h {
+
+ m := (l + h) / 2
+
+ if m == entryCount {
+ return Offset{}, true, nil
+ }
+
+ // read the appendAtNs for entry m
+ offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
+ if err != nil {
+ return
+ }
+
+ mNs, nsReadErr := v.readAppendAtNs(offset)
+ if nsReadErr != nil {
+ err = nsReadErr
+ return
+ }
+
+ // move the boundary
+ if mNs <= sinceNs {
+ l = m + 1
+ } else {
+ h = m
+ }
+
+ }
+
+ if l == entryCount {
+ return Offset{}, true, nil
+ }
+
+ offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
+
+ return offset, false, err
+
+}
+
+// bytes is of size NeedleEntrySize
+func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
+ if _, readErr := indexFile.ReadAt(bytes, m*NeedleEntrySize); readErr != nil && readErr != io.EOF {
+ return Offset{}, readErr
+ }
+ _, offset, _ := IdxFileEntry(bytes)
+ return offset, nil
+}
+
+// generate the volume idx
+type VolumeFileScanner4GenIdx struct {
+ v *Volume
+}
+
+func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error {
+ return nil
+
+}
+func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
+ return false
+}
+
+func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *Needle, offset int64) error {
+ if n.Size > 0 && n.Size != TombstoneFileSize {
+ return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
+ }
+ return scanner.v.nm.Delete(n.Id, ToOffset(offset))
+}