aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go6
-rw-r--r--weed/mq/client/sub_client/process.go81
-rw-r--r--weed/mq/client/sub_client/subscribe.go72
-rw-r--r--weed/mq/client/sub_client/subscriber.go11
4 files changed, 99 insertions, 71 deletions
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index 1ec24f406..6cb18c574 100644
--- a/weed/mq/client/cmd/weed_sub/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
@@ -22,11 +22,7 @@ func main() {
Filter: "",
}
- subscriber := sub_client.NewTopicSubscriber(subscriberConfig, contentConfig)
- if err := subscriber.Connect("localhost:17777"); err != nil {
- fmt.Println(err)
- return
- }
+ subscriber := sub_client.NewTopicSubscriber("localhost:17777", subscriberConfig, contentConfig)
subscriber.SetEachMessageFunc(func(key, value []byte) bool {
println(string(key), "=>", string(value))
diff --git a/weed/mq/client/sub_client/process.go b/weed/mq/client/sub_client/process.go
new file mode 100644
index 000000000..7717a101f
--- /dev/null
+++ b/weed/mq/client/sub_client/process.go
@@ -0,0 +1,81 @@
+package sub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "sync"
+)
+
+func (sub *TopicSubscriber) doProcess() error {
+ var wg sync.WaitGroup
+ for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments {
+ brokerAddress := brokerPartitionAssignment.LeaderBroker
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption)
+ if err != nil {
+ return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
+ Message: &mq_pb.SubscribeRequest_Init{
+ Init: &mq_pb.SubscribeRequest_InitMessage{
+ ConsumerGroup: sub.SubscriberConfig.GroupId,
+ ConsumerId: sub.SubscriberConfig.GroupInstanceId,
+ Topic: &mq_pb.Topic{
+ Namespace: sub.ContentConfig.Namespace,
+ Name: sub.ContentConfig.Topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: brokerPartitionAssignment.Partition.RingSize,
+ RangeStart: brokerPartitionAssignment.Partition.RangeStart,
+ RangeStop: brokerPartitionAssignment.Partition.RangeStop,
+ },
+ Filter: sub.ContentConfig.Filter,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("create subscribe client: %v", err)
+ }
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if sub.OnCompletionFunc != nil {
+ defer sub.OnCompletionFunc()
+ }
+ defer func() {
+ subscribeClient.SendMsg(&mq_pb.SubscribeRequest{
+ Message: &mq_pb.SubscribeRequest_Ack{
+ Ack: &mq_pb.SubscribeRequest_AckMessage{
+ Sequence: 0,
+ },
+ },
+ })
+ subscribeClient.CloseSend()
+ }()
+ for {
+ resp, err := subscribeClient.Recv()
+ if err != nil {
+ fmt.Printf("subscribe error: %v\n", err)
+ return
+ }
+ if resp.Message == nil {
+ continue
+ }
+ switch m := resp.Message.(type) {
+ case *mq_pb.SubscribeResponse_Data:
+ if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
+ return
+ }
+ case *mq_pb.SubscribeResponse_Ctrl:
+ if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
+ return
+ }
+ }
+ }
+ }()
+ }
+ wg.Wait()
+ return nil
+}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index 7830ac29f..bfde6a512 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,72 +1,28 @@
package sub_client
import (
- "context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "sync"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "io"
)
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
- var wg sync.WaitGroup
- for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments {
- brokerAddress := brokerPartitionAssignment.LeaderBroker
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption)
- if err != nil {
- return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ util.RetryUntil("subscribe", func() error {
+ if err := sub.doLookup(sub.bootstrapBroker); err != nil {
+ return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
- Message: &mq_pb.SubscribeRequest_Init{
- Init: &mq_pb.SubscribeRequest_InitMessage{
- ConsumerGroup: sub.SubscriberConfig.GroupId,
- ConsumerId: sub.SubscriberConfig.GroupInstanceId,
- Topic: &mq_pb.Topic{
- Namespace: sub.ContentConfig.Namespace,
- Name: sub.ContentConfig.Topic,
- },
- Partition: &mq_pb.Partition{
- RingSize: brokerPartitionAssignment.Partition.RingSize,
- RangeStart: brokerPartitionAssignment.Partition.RangeStart,
- RangeStop: brokerPartitionAssignment.Partition.RangeStop,
- },
- Filter: sub.ContentConfig.Filter,
- },
- },
- })
- if err != nil {
- return fmt.Errorf("create subscribe client: %v", err)
+ if err := sub.doProcess(); err != nil {
+ return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
- wg.Add(1)
- go func() {
- defer wg.Done()
- if sub.OnCompletionFunc != nil {
- defer sub.OnCompletionFunc()
- }
- for {
- resp, err := subscribeClient.Recv()
- if err != nil {
- fmt.Printf("subscribe error: %v\n", err)
- return
- }
- if resp.Message == nil {
- continue
- }
- switch m := resp.Message.(type) {
- case *mq_pb.SubscribeResponse_Data:
- if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
- return
- }
- case *mq_pb.SubscribeResponse_Ctrl:
- // ignore
- }
- }
- }()
- }
- wg.Wait()
+ return nil
+ }, func(err error) bool {
+ if err == io.EOF {
+ return false
+ }
+ return true
+ })
return nil
}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 404d05222..f744c6fa2 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -28,22 +28,17 @@ type TopicSubscriber struct {
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
+ bootstrapBroker string
}
-func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
+func NewTopicSubscriber(bootstrapBroker string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
SubscriberConfig: subscriber,
ContentConfig: content,
+ bootstrapBroker: bootstrapBroker,
}
}
-func (sub *TopicSubscriber) Connect(bootstrapBroker string) error {
- if err := sub.doLookup(bootstrapBroker); err != nil {
- return err
- }
- return nil
-}
-
func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
sub.OnEachMessageFunc = onEachMessageFn
}