diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/brokder_grpc_admin.go | 10 | ||||
| -rw-r--r-- | weed/mq/broker/consistent_distribution.go | 38 | ||||
| -rw-r--r-- | weed/mq/broker/consistent_distribution_test.go | 32 |
3 files changed, 10 insertions, 70 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go new file mode 100644 index 000000000..e8914cbed --- /dev/null +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -0,0 +1,10 @@ +package broker + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" +) + +func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { + panic("implement me") +} diff --git a/weed/mq/broker/consistent_distribution.go b/weed/mq/broker/consistent_distribution.go deleted file mode 100644 index 465a2a8f2..000000000 --- a/weed/mq/broker/consistent_distribution.go +++ /dev/null @@ -1,38 +0,0 @@ -package broker - -import ( - "github.com/buraksezer/consistent" - "github.com/cespare/xxhash" -) - -type Member string - -func (m Member) String() string { - return string(m) -} - -type hasher struct{} - -func (h hasher) Sum64(data []byte) uint64 { - return xxhash.Sum64(data) -} - -func PickMember(members []string, key []byte) string { - cfg := consistent.Config{ - PartitionCount: 9791, - ReplicationFactor: 2, - Load: 1.25, - Hasher: hasher{}, - } - - cmembers := []consistent.Member{} - for _, m := range members { - cmembers = append(cmembers, Member(m)) - } - - c := consistent.New(cmembers, cfg) - - m := c.LocateKey(key) - - return m.String() -} diff --git a/weed/mq/broker/consistent_distribution_test.go b/weed/mq/broker/consistent_distribution_test.go deleted file mode 100644 index f58fe4e0e..000000000 --- a/weed/mq/broker/consistent_distribution_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package broker - -import ( - "fmt" - "testing" -) - -func TestPickMember(t *testing.T) { - - servers := []string{ - "s1:port", - "s2:port", - "s3:port", - "s5:port", - "s4:port", - } - - total := 1000 - - distribution := make(map[string]int) - for i := 0; i < total; i++ { - tp := fmt.Sprintf("tp:%2d", i) - m := PickMember(servers, []byte(tp)) - // println(tp, "=>", m) - distribution[m]++ - } - - for member, count := range distribution { - fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers))) - } - -} |
