aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-10 12:11:37 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:24:38 -0700
commit94b8c42b2cba40557ab28cd613ab8c19a93fca23 (patch)
treeb91f02bc735b31e613a862609384e1a7e4b8755f /weed/mq
parent8060fdcac56bae36b53764d7ad23a142a865e67d (diff)
downloadseaweedfs-94b8c42b2cba40557ab28cd613ab8c19a93fca23.tar.xz
seaweedfs-94b8c42b2cba40557ab28cd613ab8c19a93fca23.zip
clean up
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/brokder_grpc_admin.go10
-rw-r--r--weed/mq/broker/consistent_distribution.go38
-rw-r--r--weed/mq/broker/consistent_distribution_test.go32
3 files changed, 10 insertions, 70 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go
new file mode 100644
index 000000000..e8914cbed
--- /dev/null
+++ b/weed/mq/broker/brokder_grpc_admin.go
@@ -0,0 +1,10 @@
+package broker
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
+)
+
+func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
+ panic("implement me")
+}
diff --git a/weed/mq/broker/consistent_distribution.go b/weed/mq/broker/consistent_distribution.go
deleted file mode 100644
index 465a2a8f2..000000000
--- a/weed/mq/broker/consistent_distribution.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package broker
-
-import (
- "github.com/buraksezer/consistent"
- "github.com/cespare/xxhash"
-)
-
-type Member string
-
-func (m Member) String() string {
- return string(m)
-}
-
-type hasher struct{}
-
-func (h hasher) Sum64(data []byte) uint64 {
- return xxhash.Sum64(data)
-}
-
-func PickMember(members []string, key []byte) string {
- cfg := consistent.Config{
- PartitionCount: 9791,
- ReplicationFactor: 2,
- Load: 1.25,
- Hasher: hasher{},
- }
-
- cmembers := []consistent.Member{}
- for _, m := range members {
- cmembers = append(cmembers, Member(m))
- }
-
- c := consistent.New(cmembers, cfg)
-
- m := c.LocateKey(key)
-
- return m.String()
-}
diff --git a/weed/mq/broker/consistent_distribution_test.go b/weed/mq/broker/consistent_distribution_test.go
deleted file mode 100644
index f58fe4e0e..000000000
--- a/weed/mq/broker/consistent_distribution_test.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package broker
-
-import (
- "fmt"
- "testing"
-)
-
-func TestPickMember(t *testing.T) {
-
- servers := []string{
- "s1:port",
- "s2:port",
- "s3:port",
- "s5:port",
- "s4:port",
- }
-
- total := 1000
-
- distribution := make(map[string]int)
- for i := 0; i < total; i++ {
- tp := fmt.Sprintf("tp:%2d", i)
- m := PickMember(servers, []byte(tp))
- // println(tp, "=>", m)
- distribution[m]++
- }
-
- for member, count := range distribution {
- fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
- }
-
-}