aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-01-10 00:51:25 -0800
committerchrislu <chris.lu@gmail.com>2023-01-10 00:51:25 -0800
commit7f49c59c14efaec66966aae7f6721bb85469b902 (patch)
tree35cd220643b5ea500cb521f353b32e62ebfe4e5f
parent340e7c3a2e3b6fa64018bad1960b7afcf1f222eb (diff)
downloadseaweedfs-7f49c59c14efaec66966aae7f6721bb85469b902.tar.xz
seaweedfs-7f49c59c14efaec66966aae7f6721bb85469b902.zip
cluster.ps add filer meta sync progress
-rw-r--r--weed/filer/meta_aggregator.go12
-rw-r--r--weed/shell/command_cluster_ps.go21
2 files changed, 29 insertions, 4 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index fbc163442..8cd7d5bf9 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -243,10 +243,15 @@ const (
MetaOffsetPrefix = "Meta"
)
-func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
-
+func GetPeerMetaOffsetKey(peerSignature int32) []byte {
key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
+ return key
+}
+
+func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
+
+ key := GetPeerMetaOffsetKey(peerSignature)
value, err := f.Store.KvGet(context.Background(), key)
@@ -263,8 +268,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignat
func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
- key := []byte(MetaOffsetPrefix + "xxxx")
- util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
+ key := GetPeerMetaOffsetKey(peerSignature)
value := make([]byte, 8)
util.Uint64toBytes(value, uint64(lastTsNs))
diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go
index 5c495b2e2..fc6725fad 100644
--- a/weed/shell/command_cluster_ps.go
+++ b/weed/shell/command_cluster_ps.go
@@ -5,10 +5,13 @@ import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/cluster"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
@@ -92,6 +95,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
}
}
+ filerSignatures := make(map[*master_pb.ListClusterNodesResponse_ClusterNode]int32)
fmt.Fprintf(writer, "* filers %d\n", len(filerNodes))
for _, node := range filerNodes {
fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)
@@ -108,12 +112,29 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup)
}
fmt.Fprintf(writer, " signature: %d\n", resp.Signature)
+ filerSignatures[node] = resp.Signature
} else {
fmt.Fprintf(writer, " failed to connect: %v\n", err)
}
return err
})
}
+ for _, node := range filerNodes {
+ pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address)
+ selfSignature := filerSignatures[node]
+ for peer, peerSignature := range filerSignatures {
+ if selfSignature == peerSignature {
+ continue
+ }
+ if resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: filer.GetPeerMetaOffsetKey(peerSignature)}); err == nil && len(resp.Value) == 8 {
+ lastTsNs := int64(util.BytesToUint64(resp.Value))
+ fmt.Fprintf(writer, " %s: %v\n", peer.Address, time.Unix(0, lastTsNs).UTC())
+ }
+ }
+ return nil
+ })
+ }
// collect volume servers
var volumeServers []pb.ServerAddress