From ad12f778912273bb0bd35f0a84fc8093fb6f595d Mon Sep 17 00:00:00 2001 From: James Hartig Date: Wed, 22 Jul 2020 15:02:36 -0400 Subject: unmaintained: Added diff_volume_servers --- .../diff_volume_servers/diff_volume_servers.go | 194 +++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 unmaintained/diff_volume_servers/diff_volume_servers.go (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go new file mode 100644 index 000000000..0d5bf9ab4 --- /dev/null +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -0,0 +1,194 @@ +package main + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "math" + "os" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/idx" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +var ( + serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against") + volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers") + volumeCollection = flag.String("collection", "", "the volume collection name") + grpcDialOption grpc.DialOption +) + +/* + Diff the volume's files across multiple volume servers. + diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5 + + Example Output: + reference 127.0.0.1:8081 + fileId volumeServer message + 5,01617c3f61 127.0.0.1:8080 wrongSize +*/ +func main() { + flag.Parse() + + util.LoadConfiguration("security", false) + grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + + vid := uint32(*volumeId) + servers := strings.Split(*serversStr, ",") + if len(servers) < 2 { + glog.Fatalf("You must specify more than 1 server\n") + } + var referenceServer string + var maxOffset int64 + allFiles := map[string]map[types.NeedleId]needleState{} + for _, addr := range servers { + files, offset, err := getVolumeFiles(vid, addr) + if err != nil { + glog.Fatalf("Failed to copy idx from volume server %s\n", err) + } + allFiles[addr] = files + if offset > maxOffset { + referenceServer = addr + } + } + + same := true + fmt.Println("reference", referenceServer) + fmt.Println("fileId volumeServer message") + for nid, n := range allFiles[referenceServer] { + for addr, files := range allFiles { + if addr == referenceServer { + continue + } + var diffMsg string + n2, ok := files[nid] + if !ok { + if n.state == stateDeleted { + continue + } + diffMsg = "missing" + } else if n2.state != n.state { + switch n.state { + case stateDeleted: + diffMsg = "notDeleted" + case statePresent: + diffMsg = "deleted" + } + } else if n2.size != n.size { + diffMsg = "wrongSize" + } else { + continue + } + same = false + + // fetch the needle details + var id string + var err error + if n.state == statePresent { + id, err = getNeedleFileId(vid, nid, referenceServer) + } else { + id, err = getNeedleFileId(vid, nid, addr) + } + if err != nil { + glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err) + } + fmt.Println(id, addr, diffMsg) + } + } + if !same { + os.Exit(1) + } +} + +const ( + stateDeleted uint8 = 1 + statePresent uint8 = 2 +) + +type needleState struct { + state uint8 + size uint32 +} + +func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) { + var idxFile *bytes.Reader + err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: v, + Ext: ".idx", + CompactionRevision: math.MaxUint32, + StopOffset: math.MaxInt64, + Collection: *volumeCollection, + }) + if err != nil { + return err + } + var buf bytes.Buffer + for { + resp, err := copyFileClient.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return err + } + buf.Write(resp.FileContent) + } + idxFile = bytes.NewReader(buf.Bytes()) + return nil + }) + if err != nil { + return nil, 0, err + } + + var maxOffset int64 + files := map[types.NeedleId]needleState{} + err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size uint32) error { + if offset.IsZero() || size == types.TombstoneFileSize { + files[key] = needleState{ + state: stateDeleted, + size: size, + } + } else { + files[key] = needleState{ + state: statePresent, + size: size, + } + } + if actual := offset.ToAcutalOffset(); actual > maxOffset { + maxOffset = actual + } + return nil + }) + if err != nil { + return nil, 0, err + } + return files, maxOffset, nil +} + +func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) { + var id string + err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ + VolumeId: v, + NeedleId: uint64(nid), + }) + if err != nil { + return err + } + id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String() + return nil + }) + return id, err +} -- cgit v1.2.3 From 6a92f0bc7a2cbf0828c720422220b600263b5217 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 18 Aug 2020 17:04:28 -0700 Subject: refactoring to typed Size Go is amazing with refactoring! --- unmaintained/diff_volume_servers/diff_volume_servers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 0d5bf9ab4..e0f16e2d0 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -154,7 +154,7 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6 var maxOffset int64 files := map[types.NeedleId]needleState{} - err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size uint32) error { + err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size Size) error { if offset.IsZero() || size == types.TombstoneFileSize { files[key] = needleState{ state: stateDeleted, -- cgit v1.2.3 From 332caf0cd7881ef4881099fee09ac9c8a63d8f0b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 18 Aug 2020 17:23:01 -0700 Subject: maintain the unmaintained --- unmaintained/diff_volume_servers/diff_volume_servers.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index e0f16e2d0..339d9c335 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -118,7 +118,7 @@ const ( type needleState struct { state uint8 - size uint32 + size types.Size } func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) { @@ -154,8 +154,8 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6 var maxOffset int64 files := map[types.NeedleId]needleState{} - err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size Size) error { - if offset.IsZero() || size == types.TombstoneFileSize { + err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { + if offset.IsZero() || size < 0 || size == types.TombstoneFileSize { files[key] = needleState{ state: stateDeleted, size: size, -- cgit v1.2.3 From c026eb05921b180cad0b529634ff4c891cb5b61f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 18 Aug 2020 17:39:29 -0700 Subject: refactoring --- unmaintained/diff_volume_servers/diff_volume_servers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 339d9c335..4de864980 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -155,7 +155,7 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6 var maxOffset int64 files := map[types.NeedleId]needleState{} err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { - if offset.IsZero() || size < 0 || size == types.TombstoneFileSize { + if offset.IsZero() || size.IsDeleted() { files[key] = needleState{ state: stateDeleted, size: size, -- cgit v1.2.3 From daf0a449f7424d4a8252673509af5afd0b9bd8ec Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 9 Sep 2020 12:07:15 -0700 Subject: properly cancel context for streaming grpc --- unmaintained/diff_volume_servers/diff_volume_servers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 4de864980..6107f3d48 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -124,7 +124,9 @@ type needleState struct { func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) { var idxFile *bytes.Reader err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { - copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ VolumeId: v, Ext: ".idx", CompactionRevision: math.MaxUint32, -- cgit v1.2.3 From 75de7002ff028c419995b355db54c8ffc7544748 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 11 Sep 2020 11:43:13 -0700 Subject: adjust size --- unmaintained/diff_volume_servers/diff_volume_servers.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 6107f3d48..137cb82cf 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -158,6 +158,9 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6 files := map[types.NeedleId]needleState{} err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { if offset.IsZero() || size.IsDeleted() { + if size < 0 { + size = -size + } files[key] = needleState{ state: stateDeleted, size: size, -- cgit v1.2.3 From f2723c1bc83148379dea24ae9c2e3835d089d8eb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 12 Sep 2020 12:42:36 -0700 Subject: do not idx file format revert c9ab8d05fa9b425352ce978b5c5b5b0d71d787ad --- unmaintained/diff_volume_servers/diff_volume_servers.go | 3 --- 1 file changed, 3 deletions(-) (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 137cb82cf..6107f3d48 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -158,9 +158,6 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6 files := map[types.NeedleId]needleState{} err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { if offset.IsZero() || size.IsDeleted() { - if size < 0 { - size = -size - } files[key] = needleState{ state: stateDeleted, size: size, -- cgit v1.2.3 From 7256902fb0c3ac637d40d8ee91508216b42a6d60 Mon Sep 17 00:00:00 2001 From: bingoohuang Date: Sun, 7 Feb 2021 12:11:51 +0800 Subject: fix typo offset.ToAcutalOffset to offset.ToActualOffset --- unmaintained/diff_volume_servers/diff_volume_servers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'unmaintained/diff_volume_servers/diff_volume_servers.go') diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 6107f3d48..27a537617 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -168,7 +168,7 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6 size: size, } } - if actual := offset.ToAcutalOffset(); actual > maxOffset { + if actual := offset.ToActualOffset(); actual > maxOffset { maxOffset = actual } return nil -- cgit v1.2.3