aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go3
-rw-r--r--weed/messaging/broker/consistent_distribution.go4
-rw-r--r--weed/messaging/broker/consistent_distribution_test.go4
-rw-r--r--weed/messaging/broker/topic_lock.go2
-rw-r--r--weed/messaging/msgclient/client.go1
-rw-r--r--weed/messaging/msgclient/publisher.go1
-rw-r--r--weed/messaging/msgclient/subscriber.go2
7 files changed, 9 insertions, 8 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 86ee6923d..76cbdef24 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -37,8 +37,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// IsTransient: true,
}
- if err = stream.Send(&messaging_pb.BrokerMessage{
- }); err != nil {
+ if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
return err
}
diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go
index dd7d34f86..465a2a8f2 100644
--- a/weed/messaging/broker/consistent_distribution.go
+++ b/weed/messaging/broker/consistent_distribution.go
@@ -1,8 +1,8 @@
package broker
import (
- "github.com/cespare/xxhash"
"github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
)
type Member string
@@ -35,4 +35,4 @@ func PickMember(members []string, key []byte) string {
m := c.LocateKey(key)
return m.String()
-} \ No newline at end of file
+}
diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go
index 192516092..f58fe4e0e 100644
--- a/weed/messaging/broker/consistent_distribution_test.go
+++ b/weed/messaging/broker/consistent_distribution_test.go
@@ -18,7 +18,7 @@ func TestPickMember(t *testing.T) {
total := 1000
distribution := make(map[string]int)
- for i:=0;i<total;i++{
+ for i := 0; i < total; i++ {
tp := fmt.Sprintf("tp:%2d", i)
m := PickMember(servers, []byte(tp))
// println(tp, "=>", m)
@@ -29,4 +29,4 @@ func TestPickMember(t *testing.T) {
fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
}
-} \ No newline at end of file
+}
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go
index 4c4803275..9ae446df3 100644
--- a/weed/messaging/broker/topic_lock.go
+++ b/weed/messaging/broker/topic_lock.go
@@ -16,9 +16,11 @@ type TopicPartition struct {
Topic string
Partition int32
}
+
const (
TopicPartitionFmt = "%s/%s_%02d"
)
+
func (tp *TopicPartition) String() string {
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
}
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go
index f4e11232e..4d7ef2b8e 100644
--- a/weed/messaging/msgclient/client.go
+++ b/weed/messaging/msgclient/client.go
@@ -28,7 +28,6 @@ func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
}
}
-
func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
for _, broker := range mc.bootstrapBrokers {
diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go
index 08f1d278a..ebb6d3f2a 100644
--- a/weed/messaging/msgclient/publisher.go
+++ b/weed/messaging/msgclient/publisher.go
@@ -16,6 +16,7 @@ type Publisher struct {
messageCount uint64
publisherId string
}
+
/*
func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
// read topic configuration
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
index d3066d6ef..efbfa0337 100644
--- a/weed/messaging/msgclient/subscriber.go
+++ b/weed/messaging/msgclient/subscriber.go
@@ -5,8 +5,8 @@ import (
"io"
"time"
- "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "google.golang.org/grpc"
)
type Subscriber struct {