aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client')
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go13
-rw-r--r--weed/mq/client/pub_client/publish.go17
-rw-r--r--weed/mq/client/pub_client/publisher.go1
-rw-r--r--weed/mq/client/pub_client/scheduler.go35
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go21
5 files changed, 66 insertions, 21 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index 2873ba21f..e9227130a 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -16,6 +16,8 @@ var (
concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count")
+ clientName = flag.String("client", "c1", "client name")
+
namespace = flag.String("ns", "test", "namespace")
t = flag.String("t", "test", "t")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
@@ -25,16 +27,20 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
startTime := time.Now()
for i := 0; i < *messageCount / *concurrency; i++ {
// Simulate publishing a message
- key := []byte(fmt.Sprintf("key-%d-%d", id, i))
- value := []byte(fmt.Sprintf("value-%d-%d", id, i))
+ key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i))
+ value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i))
if err := publisher.Publish(key, value); err != nil {
fmt.Println(err)
break
}
+ time.Sleep(time.Second)
// println("Published", string(key), string(value))
}
+ if err := publisher.FinishPublish(); err != nil {
+ fmt.Println(err)
+ }
elapsed := time.Since(startTime)
- log.Printf("Publisher %d finished in %s", id, elapsed)
+ log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
}
func main() {
@@ -44,6 +50,7 @@ func main() {
CreateTopic: true,
CreateTopicPartitionCount: int32(*partitionCount),
Brokers: strings.Split(*seedBrokers, ","),
+ PublisherName: *clientName,
}
publisher := pub_client.NewTopicPublisher(config)
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 1c5891049..fbb07b042 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "time"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
@@ -20,5 +21,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
return inputBuffer.Enqueue(&mq_pb.DataMessage{
Key: key,
Value: value,
+ TsNs: time.Now().UnixNano(),
})
}
+
+func (p *TopicPublisher) FinishPublish() error {
+ if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
+ for _, inputBuffer := range inputBuffers {
+ inputBuffer.Enqueue(&mq_pb.DataMessage{
+ TsNs: time.Now().UnixNano(),
+ Ctrl: &mq_pb.ControlMessage{
+ IsClose: true,
+ },
+ })
+ }
+ }
+
+ return nil
+}
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 9262d6e0c..09984bae3 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -17,6 +17,7 @@ type PublisherConfiguration struct {
CreateTopic bool
CreateTopicPartitionCount int32
Brokers []string
+ PublisherName string // for debugging
}
type PublishClient struct {
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index e6caf896c..e92e07ab5 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -12,6 +12,7 @@ import (
"log"
"sort"
"sync"
+ "sync/atomic"
"time"
)
@@ -145,11 +146,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
Partition: job.Partition,
AckInterval: 128,
FollowerBrokers: job.FollowerBrokers,
+ PublisherName: p.config.PublisherName,
},
},
}); err != nil {
return fmt.Errorf("send init message: %v", err)
}
+ // process the hello message
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("recv init response: %v", err)
@@ -158,31 +161,44 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("init response error: %v", resp.Error)
}
+ var publishedTsNs int64
+ hasMoreData := int32(1)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
ackResp, err := publishClient.Recv()
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Unknown && e.Message() == "EOF" {
+ log.Printf("publish to %s EOF", publishClient.Broker)
return
}
publishClient.Err = err
- fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
+ log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
return
}
if ackResp.Error != "" {
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
return
}
if ackResp.AckSequence > 0 {
- log.Printf("ack %d", ackResp.AckSequence)
+ log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ }
+ if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ return
}
}
}()
publishCounter := 0
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
+ if data.Ctrl != nil && data.Ctrl.IsClose {
+ // need to set this before sending to brokers, to avoid timing issue
+ atomic.StoreInt32(&hasMoreData, 0)
+ }
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: data,
@@ -191,14 +207,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("send publish data: %v", err)
}
publishCounter++
+ atomic.StoreInt64(&publishedTsNs, data.TsNs)
}
-
- if err := publishClient.CloseSend(); err != nil {
- return fmt.Errorf("close send: %v", err)
+ if publishCounter > 0 {
+ wg.Wait()
+ } else {
+ // CloseSend would cancel the context on the server side
+ if err := publishClient.CloseSend(); err != nil {
+ return fmt.Errorf("close send: %v", err)
+ }
}
- time.Sleep(3 * time.Second)
-
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
return nil
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 4cc3c8ff2..b0b533e42 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -91,26 +91,26 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
var wg sync.WaitGroup
semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
- for _, assigned := range assignment.AssignedPartitions {
+ for _, assigned := range assignment.PartitionAssignments {
wg.Add(1)
semaphore <- struct{}{}
- go func(partition *mq_pb.Partition, broker string) {
+ go func(assigned *mq_pb.BrokerPartitionAssignment) {
defer wg.Done()
defer func() { <-semaphore }()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
- err := sub.onEachPartition(partition, broker)
+ glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ err := sub.onEachPartition(assigned)
if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err)
+ glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
}
- }(assigned.Partition, assigned.Broker)
+ }(assigned)
}
wg.Wait()
}
-func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error {
+func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
// connect to the partition broker
- return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
@@ -118,11 +118,12 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
PartitionOffset: &mq_pb.PartitionOffset{
- Partition: partition,
+ Partition: assigned.Partition,
StartTsNs: sub.alreadyProcessedTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
Filter: sub.ContentConfig.Filter,
+ FollowerBrokers: assigned.FollowerBrokers,
},
},
})
@@ -131,7 +132,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt.Errorf("create subscribe client: %v", err)
}
- glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
+ glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()