aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-04 21:43:30 -0700
committerchrislu <chris.lu@gmail.com>2023-09-04 21:43:30 -0700
commit9e4f98569898985ed285d8bb8a39b4ea5f095a98 (patch)
tree722d4bb8536334ec3ed97810760698faab719999
parentcb470d44df2fed94ad8fd370b1c281cb126d373b (diff)
downloadseaweedfs-9e4f98569898985ed285d8bb8a39b4ea5f095a98.tar.xz
seaweedfs-9e4f98569898985ed285d8bb8a39b4ea5f095a98.zip
publish, benchmark
-rw-r--r--weed/mq/broker/broker_grpc_pub.go4
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go48
-rw-r--r--weed/mq/client/pub_client/lookup.go105
-rw-r--r--weed/mq/client/pub_client/publish.go17
-rw-r--r--weed/mq/client/pub_client/publisher.go23
5 files changed, 138 insertions, 59 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 79393332f..37f937949 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -87,6 +87,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
}
+ stream.Send(response)
} else {
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
@@ -106,7 +107,6 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
AckSequence: sequence,
}
if dataMessage := req.GetData(); dataMessage != nil {
- print("+")
localTopicPartition.Publish(dataMessage)
}
if err := stream.Send(response); err != nil {
@@ -114,6 +114,8 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
}
}
+ glog.Infof("publish stream closed")
+
return nil
}
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index a540143a4..f5a454640 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -1,12 +1,34 @@
package main
import (
+ "flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "log"
+ "sync"
+ "time"
)
-func main() {
+var (
+ messageCount = flag.Int("n", 1000, "message count")
+ concurrency = flag.Int("c", 4, "concurrency count")
+)
+
+func doPublish(publisher *pub_client.TopicPublisher, id int) {
+ startTime := time.Now()
+ for i := 0; i < *messageCount / *concurrency; i++ {
+ // Simulate publishing a message
+ key := []byte(fmt.Sprintf("key-%d-%d", id, i))
+ value := []byte(fmt.Sprintf("value-%d-%d", id, i))
+ publisher.Publish(key, value) // Call your publisher function here
+ // println("Published", string(key), string(value))
+ }
+ elapsed := time.Since(startTime)
+ log.Printf("Publisher %d finished in %s", id, elapsed)
+}
+func main() {
+ flag.Parse()
publisher := pub_client.NewTopicPublisher(
"test", "test")
if err := publisher.Connect("localhost:17777"); err != nil {
@@ -14,16 +36,22 @@ func main() {
return
}
- for i := 0; i < 10; i++ {
- if dataErr := publisher.Publish(
- []byte(fmt.Sprintf("key-%d", i)),
- []byte(fmt.Sprintf("value-%d", i)),
- ); dataErr != nil {
- fmt.Println(dataErr)
- return
- }
+ startTime := time.Now()
+
+ // Start multiple publishers
+ var wg sync.WaitGroup
+ for i := 0; i < *concurrency; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ doPublish(publisher, id)
+ }(i)
}
- fmt.Println("done publishing")
+ // Wait for all publishers to finish
+ wg.Wait()
+ elapsed := time.Since(startTime)
+
+ log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
}
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go
index 5a9376ab1..8e279fb0b 100644
--- a/weed/mq/client/pub_client/lookup.go
+++ b/weed/mq/client/pub_client/lookup.go
@@ -24,43 +24,21 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error {
return err
}
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
- // partition => broker
- p.partition2Broker.Insert(
- brokerPartitionAssignment.Partition.RangeStart,
- brokerPartitionAssignment.Partition.RangeStop,
- brokerPartitionAssignment.LeaderBroker)
-
- // broker => publish client
- // send init message
- // save the publishing client
- brokerAddress := brokerPartitionAssignment.LeaderBroker
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
+ // partition => publishClient
+ publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
if err != nil {
- return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ return err
}
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- publishClient, err := brokerClient.Publish(context.Background())
- if err != nil {
- return fmt.Errorf("create publish client: %v", err)
- }
- p.broker2PublishClient.Set(brokerAddress, publishClient)
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- Partition: &mq_pb.Partition{
- RingSize: brokerPartitionAssignment.Partition.RingSize,
- RangeStart: brokerPartitionAssignment.Partition.RangeStart,
- RangeStop: brokerPartitionAssignment.Partition.RangeStop,
- },
- },
- },
- }); err != nil {
- return fmt.Errorf("send init message: %v", err)
+ for redirectTo != "" {
+ publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo)
+ if err != nil {
+ return err
+ }
}
+ p.partition2Broker.Insert(
+ brokerPartitionAssignment.Partition.RangeStart,
+ brokerPartitionAssignment.Partition.RangeStop,
+ publishClient)
}
return nil
})
@@ -70,3 +48,62 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error {
}
return nil
}
+
+// broker => publish client
+// send init message
+// save the publishing client
+func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) {
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
+ if err != nil {
+ return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ stream, err := brokerClient.Publish(context.Background())
+ if err != nil {
+ return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err)
+ }
+ publishClient = &PublishClient{
+ SeaweedMessaging_PublishClient: stream,
+ Broker: brokerAddress,
+ }
+ if err = publishClient.Send(&mq_pb.PublishRequest{
+ Message: &mq_pb.PublishRequest_Init{
+ Init: &mq_pb.PublishRequest_InitMessage{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: partition.RingSize,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ },
+ },
+ },
+ }); err != nil {
+ return publishClient, redirectTo, fmt.Errorf("send init message: %v", err)
+ }
+ resp, err := stream.Recv()
+ if err != nil {
+ return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err)
+ }
+ if resp.Error != "" {
+ return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error)
+ }
+ if resp.RedirectToBroker != "" {
+ redirectTo = resp.RedirectToBroker
+ return publishClient, redirectTo, nil
+ }
+
+ go func() {
+ for {
+ _, err := publishClient.Recv()
+ if err != nil {
+ publishClient.Err = err
+ fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
+ return
+ }
+ }
+ }()
+ return publishClient, redirectTo, nil
+}
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 0ecb55c9b..de4714831 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -12,14 +12,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
if hashKey < 0 {
hashKey = -hashKey
}
- brokerAddress, found := p.partition2Broker.Floor(hashKey, hashKey)
+ publishClient, found := p.partition2Broker.Floor(hashKey, hashKey)
if !found {
return fmt.Errorf("no broker found for key %d", hashKey)
}
- publishClient, found := p.broker2PublishClient.Get(brokerAddress)
- if !found {
- return fmt.Errorf("no publish client found for broker %s", brokerAddress)
- }
+ p.Lock()
+ defer p.Unlock()
+ // dead lock here
+ //google.golang.org/grpc/internal/transport.(*writeQuota).get(flowcontrol.go:59)
+ //google.golang.org/grpc/internal/transport.(*http2Client).Write(http2_client.go:1047)
+ //google.golang.org/grpc.(*csAttempt).sendMsg(stream.go:1040)
+ //google.golang.org/grpc.(*clientStream).SendMsg.func2(stream.go:892)
+ //google.golang.org/grpc.(*clientStream).withRetry(stream.go:752)
+ //google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894)
+ //github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141)
+ //github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19)
if err := publishClient.Send(&mq_pb.PublishRequest{
Message: &mq_pb.PublishRequest_Data{
Data: &mq_pb.DataMessage{
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 5963838ce..f264375fa 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -1,32 +1,37 @@
package pub_client
import (
- cmap "github.com/orcaman/concurrent-map/v2"
"github.com/rdleal/intervalst/interval"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ "sync"
)
type PublisherConfiguration struct {
}
+
+type PublishClient struct {
+ mq_pb.SeaweedMessaging_PublishClient
+ Broker string
+ Err error
+}
type TopicPublisher struct {
- namespace string
- topic string
- partition2Broker *interval.SearchTree[string, int32]
- broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient]
- grpcDialOption grpc.DialOption
+ namespace string
+ topic string
+ partition2Broker *interval.SearchTree[*PublishClient, int32]
+ grpcDialOption grpc.DialOption
+ sync.Mutex // protects grpc
}
func NewTopicPublisher(namespace, topic string) *TopicPublisher {
return &TopicPublisher{
namespace: namespace,
topic: topic,
- partition2Broker: interval.NewSearchTree[string](func(a, b int32) int {
+ partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int {
return int(a - b)
}),
- broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](),
- grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}