aboutsummaryrefslogtreecommitdiff
path: root/unmaintained/diff_volume_servers
diff options
context:
space:
mode:
Diffstat (limited to 'unmaintained/diff_volume_servers')
-rw-r--r--unmaintained/diff_volume_servers/diff_volume_servers.go194
1 files changed, 194 insertions, 0 deletions
diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go
new file mode 100644
index 000000000..0d5bf9ab4
--- /dev/null
+++ b/unmaintained/diff_volume_servers/diff_volume_servers.go
@@ -0,0 +1,194 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+)
+
+var (
+ serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against")
+ volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers")
+ volumeCollection = flag.String("collection", "", "the volume collection name")
+ grpcDialOption grpc.DialOption
+)
+
+/*
+ Diff the volume's files across multiple volume servers.
+ diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
+
+ Example Output:
+ reference 127.0.0.1:8081
+ fileId volumeServer message
+ 5,01617c3f61 127.0.0.1:8080 wrongSize
+*/
+func main() {
+ flag.Parse()
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ vid := uint32(*volumeId)
+ servers := strings.Split(*serversStr, ",")
+ if len(servers) < 2 {
+ glog.Fatalf("You must specify more than 1 server\n")
+ }
+ var referenceServer string
+ var maxOffset int64
+ allFiles := map[string]map[types.NeedleId]needleState{}
+ for _, addr := range servers {
+ files, offset, err := getVolumeFiles(vid, addr)
+ if err != nil {
+ glog.Fatalf("Failed to copy idx from volume server %s\n", err)
+ }
+ allFiles[addr] = files
+ if offset > maxOffset {
+ referenceServer = addr
+ }
+ }
+
+ same := true
+ fmt.Println("reference", referenceServer)
+ fmt.Println("fileId volumeServer message")
+ for nid, n := range allFiles[referenceServer] {
+ for addr, files := range allFiles {
+ if addr == referenceServer {
+ continue
+ }
+ var diffMsg string
+ n2, ok := files[nid]
+ if !ok {
+ if n.state == stateDeleted {
+ continue
+ }
+ diffMsg = "missing"
+ } else if n2.state != n.state {
+ switch n.state {
+ case stateDeleted:
+ diffMsg = "notDeleted"
+ case statePresent:
+ diffMsg = "deleted"
+ }
+ } else if n2.size != n.size {
+ diffMsg = "wrongSize"
+ } else {
+ continue
+ }
+ same = false
+
+ // fetch the needle details
+ var id string
+ var err error
+ if n.state == statePresent {
+ id, err = getNeedleFileId(vid, nid, referenceServer)
+ } else {
+ id, err = getNeedleFileId(vid, nid, addr)
+ }
+ if err != nil {
+ glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err)
+ }
+ fmt.Println(id, addr, diffMsg)
+ }
+ }
+ if !same {
+ os.Exit(1)
+ }
+}
+
+const (
+ stateDeleted uint8 = 1
+ statePresent uint8 = 2
+)
+
+type needleState struct {
+ state uint8
+ size uint32
+}
+
+func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) {
+ var idxFile *bytes.Reader
+ err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
+ copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: v,
+ Ext: ".idx",
+ CompactionRevision: math.MaxUint32,
+ StopOffset: math.MaxInt64,
+ Collection: *volumeCollection,
+ })
+ if err != nil {
+ return err
+ }
+ var buf bytes.Buffer
+ for {
+ resp, err := copyFileClient.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ buf.Write(resp.FileContent)
+ }
+ idxFile = bytes.NewReader(buf.Bytes())
+ return nil
+ })
+ if err != nil {
+ return nil, 0, err
+ }
+
+ var maxOffset int64
+ files := map[types.NeedleId]needleState{}
+ err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ if offset.IsZero() || size == types.TombstoneFileSize {
+ files[key] = needleState{
+ state: stateDeleted,
+ size: size,
+ }
+ } else {
+ files[key] = needleState{
+ state: statePresent,
+ size: size,
+ }
+ }
+ if actual := offset.ToAcutalOffset(); actual > maxOffset {
+ maxOffset = actual
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, 0, err
+ }
+ return files, maxOffset, nil
+}
+
+func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) {
+ var id string
+ err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
+ resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
+ VolumeId: v,
+ NeedleId: uint64(nid),
+ })
+ if err != nil {
+ return err
+ }
+ id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String()
+ return nil
+ })
+ return id, err
+}