aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client')
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go14
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go18
-rw-r--r--weed/mq/client/pub_client/connect.go73
-rw-r--r--weed/mq/client/pub_client/lookup.go116
-rw-r--r--weed/mq/client/pub_client/publish.go4
-rw-r--r--weed/mq/client/pub_client/publisher.go26
-rw-r--r--weed/mq/client/sub_client/process.go4
-rw-r--r--weed/mq/client/sub_client/subscribe.go17
-rw-r--r--weed/mq/client/sub_client/subscriber.go20
9 files changed, 195 insertions, 97 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index 03674db3f..ee00be9f8 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
"log"
+ "strings"
"sync"
"time"
)
@@ -12,6 +13,10 @@ import (
var (
messageCount = flag.Int("n", 1000, "message count")
concurrency = flag.Int("c", 4, "concurrency count")
+
+ namespace = flag.String("ns", "test", "namespace")
+ topic = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
)
func doPublish(publisher *pub_client.TopicPublisher, id int) {
@@ -29,9 +34,12 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
func main() {
flag.Parse()
- publisher := pub_client.NewTopicPublisher(
- "test", "test")
- if err := publisher.Connect("localhost:17777"); err != nil {
+ config := &pub_client.PublisherConfiguration{
+ CreateTopic: true,
+ }
+ publisher := pub_client.NewTopicPublisher(*namespace, *topic, config)
+ brokers := strings.Split(*seedBrokers, ",")
+ if err := publisher.Connect(brokers); err != nil {
fmt.Println(err)
return
}
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index 6cb18c574..d5bd8f12d 100644
--- a/weed/mq/client/cmd/weed_sub/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
@@ -1,13 +1,23 @@
package main
import (
+ "flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ "strings"
+ "time"
+)
+
+var (
+ namespace = flag.String("ns", "test", "namespace")
+ topic = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
)
func main() {
+ flag.Parse()
subscriberConfig := &sub_client.SubscriberConfiguration{
ClientId: "testSubscriber",
@@ -17,12 +27,14 @@ func main() {
}
contentConfig := &sub_client.ContentConfiguration{
- Namespace: "test",
- Topic: "test",
+ Namespace: *namespace,
+ Topic: *topic,
Filter: "",
+ StartTime: time.Now(),
}
- subscriber := sub_client.NewTopicSubscriber("localhost:17777", subscriberConfig, contentConfig)
+ brokers := strings.Split(*seedBrokers, ",")
+ subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig)
subscriber.SetEachMessageFunc(func(key, value []byte) bool {
println(string(key), "=>", string(value))
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go
new file mode 100644
index 000000000..fc7ff4d77
--- /dev/null
+++ b/weed/mq/client/pub_client/connect.go
@@ -0,0 +1,73 @@
+package pub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "log"
+)
+
+// broker => publish client
+// send init message
+// save the publishing client
+func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) {
+ log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition)
+
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
+ if err != nil {
+ return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ stream, err := brokerClient.Publish(context.Background())
+ if err != nil {
+ return publishClient, fmt.Errorf("create publish client: %v", err)
+ }
+ publishClient = &PublishClient{
+ SeaweedMessaging_PublishClient: stream,
+ Broker: brokerAddress,
+ }
+ if err = publishClient.Send(&mq_pb.PublishRequest{
+ Message: &mq_pb.PublishRequest_Init{
+ Init: &mq_pb.PublishRequest_InitMessage{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: partition.RingSize,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ },
+ AckInterval: 128,
+ },
+ },
+ }); err != nil {
+ return publishClient, fmt.Errorf("send init message: %v", err)
+ }
+ resp, err := stream.Recv()
+ if err != nil {
+ return publishClient, fmt.Errorf("recv init response: %v", err)
+ }
+ if resp.Error != "" {
+ return publishClient, fmt.Errorf("init response error: %v", resp.Error)
+ }
+
+ go func() {
+ for {
+ _, err := publishClient.Recv()
+ if err != nil {
+ e, ok := status.FromError(err)
+ if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
+ return
+ }
+ publishClient.Err = err
+ fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
+ return
+ }
+ }
+ }()
+ return publishClient, nil
+}
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go
index 28cb29015..e55bfd256 100644
--- a/weed/mq/client/pub_client/lookup.go
+++ b/weed/mq/client/pub_client/lookup.go
@@ -5,11 +5,28 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
-func (p *TopicPublisher) doLookup(brokerAddress string) error {
+func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
+ if p.config.CreateTopic {
+ err := pb.WithBrokerGrpcClient(true,
+ brokerAddress,
+ p.grpcDialOption,
+ func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ PartitionCount: p.config.CreateTopicPartitionCount,
+ })
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
+ }
+ }
+
err := pb.WithBrokerGrpcClient(true,
brokerAddress,
p.grpcDialOption,
@@ -22,21 +39,36 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error {
},
IsForPublish: true,
})
+ if p.config.CreateTopic && err != nil {
+ _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ PartitionCount: p.config.CreateTopicPartitionCount,
+ })
+ if err != nil {
+ return err
+ }
+ lookupResp, err = client.LookupTopicBrokers(context.Background(),
+ &mq_pb.LookupTopicBrokersRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ IsForPublish: true,
+ })
+ }
if err != nil {
return err
}
+
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
// partition => publishClient
- publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
+ publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
if err != nil {
return err
}
- for redirectTo != "" {
- publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo)
- if err != nil {
- return err
- }
- }
p.partition2Broker.Insert(
brokerPartitionAssignment.Partition.RangeStart,
brokerPartitionAssignment.Partition.RangeStop,
@@ -50,67 +82,3 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error {
}
return nil
}
-
-// broker => publish client
-// send init message
-// save the publishing client
-func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) {
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
- }
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.Publish(context.Background())
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err)
- }
- publishClient = &PublishClient{
- SeaweedMessaging_PublishClient: stream,
- Broker: brokerAddress,
- }
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- },
- AckInterval: 128,
- },
- },
- }); err != nil {
- return publishClient, redirectTo, fmt.Errorf("send init message: %v", err)
- }
- resp, err := stream.Recv()
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err)
- }
- if resp.Error != "" {
- return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error)
- }
- if resp.RedirectToBroker != "" {
- redirectTo = resp.RedirectToBroker
- return publishClient, redirectTo, nil
- }
-
- go func() {
- for {
- _, err := publishClient.Recv()
- if err != nil {
- e, ok := status.FromError(err)
- if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
- return
- }
- publishClient.Err = err
- fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
- return
- }
- }
- }()
- return publishClient, redirectTo, nil
-}
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 9495e380c..1e250ede3 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -2,13 +2,13 @@ package pub_client
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
- hashKey := util.HashToInt32(key) % balancer.MaxPartitionCount
+ hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {
hashKey = -hashKey
}
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index bf1711e38..a0c26db36 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -1,8 +1,9 @@
package pub_client
import (
+ "fmt"
"github.com/rdleal/intervalst/interval"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -11,6 +12,8 @@ import (
)
type PublisherConfiguration struct {
+ CreateTopic bool
+ CreateTopicPartitionCount int32
}
type PublishClient struct {
@@ -24,9 +27,10 @@ type TopicPublisher struct {
partition2Broker *interval.SearchTree[*PublishClient, int32]
grpcDialOption grpc.DialOption
sync.Mutex // protects grpc
+ config *PublisherConfiguration
}
-func NewTopicPublisher(namespace, topic string) *TopicPublisher {
+func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) *TopicPublisher {
return &TopicPublisher{
namespace: namespace,
topic: topic,
@@ -34,19 +38,27 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher {
return int(a - b)
}),
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ config: config,
}
}
-func (p *TopicPublisher) Connect(bootstrapBroker string) error {
- if err := p.doLookup(bootstrapBroker); err != nil {
- return err
+func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) {
+ if len(bootstrapBrokers) == 0 {
+ return nil
}
- return nil
+ for _, b := range bootstrapBrokers {
+ err = p.doLookupAndConnect(b)
+ if err == nil {
+ return nil
+ }
+ fmt.Printf("failed to connect to %s: %v\n\n", b, err)
+ }
+ return err
}
func (p *TopicPublisher) Shutdown() error {
- if clients, found := p.partition2Broker.AllIntersections(0, balancer.MaxPartitionCount); found {
+ if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, client := range clients {
client.CloseSend()
}
diff --git a/weed/mq/client/sub_client/process.go b/weed/mq/client/sub_client/process.go
index 7717a101f..b6bdb14ee 100644
--- a/weed/mq/client/sub_client/process.go
+++ b/weed/mq/client/sub_client/process.go
@@ -32,6 +32,9 @@ func (sub *TopicSubscriber) doProcess() error {
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
+ Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{
+ StartTimestampNs: sub.alreadyProcessedTsNs,
+ },
},
},
})
@@ -68,6 +71,7 @@ func (sub *TopicSubscriber) doProcess() error {
if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
return
}
+ sub.alreadyProcessedTsNs = m.Data.TsNs
case *mq_pb.SubscribeResponse_Ctrl:
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index 0803b2c79..370f5aa3c 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -4,17 +4,30 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "log"
+ "time"
)
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
+ index := -1
util.RetryUntil("subscribe", func() error {
+ index++
+ index = index % len(sub.bootstrapBrokers)
// ask balancer for brokers of the topic
- if err := sub.doLookup(sub.bootstrapBroker); err != nil {
+ if err := sub.doLookup(sub.bootstrapBrokers[index]); err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
+ if len(sub.brokerPartitionAssignments) == 0 {
+ if sub.waitForMoreMessage {
+ time.Sleep(1 * time.Second)
+ return fmt.Errorf("no broker partition assignments")
+ } else {
+ return nil
+ }
+ }
// treat the first broker as the topic leader
// connect to the leader broker
@@ -25,6 +38,8 @@ func (sub *TopicSubscriber) Subscribe() error {
return nil
}, func(err error) bool {
if err == io.EOF {
+ log.Printf("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
+ sub.waitForMoreMessage = false
return false
}
return true
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 809673de1..9b96b14cb 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -3,6 +3,7 @@ package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
+ "time"
)
type SubscriberConfiguration struct {
@@ -19,6 +20,7 @@ type ContentConfiguration struct {
Namespace string
Topic string
Filter string
+ StartTime time.Time
}
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
@@ -30,14 +32,18 @@ type TopicSubscriber struct {
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
- bootstrapBroker string
+ bootstrapBrokers []string
+ waitForMoreMessage bool
+ alreadyProcessedTsNs int64
}
-func NewTopicSubscriber(bootstrapBroker string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
+func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
- SubscriberConfig: subscriber,
- ContentConfig: content,
- bootstrapBroker: bootstrapBroker,
+ SubscriberConfig: subscriber,
+ ContentConfig: content,
+ bootstrapBrokers: bootstrapBrokers,
+ waitForMoreMessage: true,
+ alreadyProcessedTsNs: content.StartTime.UnixNano(),
}
}
@@ -45,6 +51,6 @@ func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc
sub.OnEachMessageFunc = onEachMessageFn
}
-func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
- sub.OnCompletionFunc = onCompeletionFn
+func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {
+ sub.OnCompletionFunc = onCompletionFn
}