diff options
Diffstat (limited to 'weed/mq/client')
| -rw-r--r-- | weed/mq/client/cmd/weed_pub/publisher.go | 14 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub/subscriber.go | 18 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/connect.go | 73 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/lookup.go | 116 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publish.go | 4 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publisher.go | 26 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/process.go | 4 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 17 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 20 |
9 files changed, 195 insertions, 97 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 03674db3f..ee00be9f8 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" "log" + "strings" "sync" "time" ) @@ -12,6 +13,10 @@ import ( var ( messageCount = flag.Int("n", 1000, "message count") concurrency = flag.Int("c", 4, "concurrency count") + + namespace = flag.String("ns", "test", "namespace") + topic = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") ) func doPublish(publisher *pub_client.TopicPublisher, id int) { @@ -29,9 +34,12 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { func main() { flag.Parse() - publisher := pub_client.NewTopicPublisher( - "test", "test") - if err := publisher.Connect("localhost:17777"); err != nil { + config := &pub_client.PublisherConfiguration{ + CreateTopic: true, + } + publisher := pub_client.NewTopicPublisher(*namespace, *topic, config) + brokers := strings.Split(*seedBrokers, ",") + if err := publisher.Connect(brokers); err != nil { fmt.Println(err) return } diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index 6cb18c574..d5bd8f12d 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -1,13 +1,23 @@ package main import ( + "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "strings" + "time" +) + +var ( + namespace = flag.String("ns", "test", "namespace") + topic = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") ) func main() { + flag.Parse() subscriberConfig := &sub_client.SubscriberConfiguration{ ClientId: "testSubscriber", @@ -17,12 +27,14 @@ func main() { } contentConfig := &sub_client.ContentConfiguration{ - Namespace: "test", - Topic: "test", + Namespace: *namespace, + Topic: *topic, Filter: "", + StartTime: time.Now(), } - subscriber := sub_client.NewTopicSubscriber("localhost:17777", subscriberConfig, contentConfig) + brokers := strings.Split(*seedBrokers, ",") + subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig) subscriber.SetEachMessageFunc(func(key, value []byte) bool { println(string(key), "=>", string(value)) diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go new file mode 100644 index 000000000..fc7ff4d77 --- /dev/null +++ b/weed/mq/client/pub_client/connect.go @@ -0,0 +1,73 @@ +package pub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "log" +) + +// broker => publish client +// send init message +// save the publishing client +func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) { + log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition) + + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) + if err != nil { + return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + stream, err := brokerClient.Publish(context.Background()) + if err != nil { + return publishClient, fmt.Errorf("create publish client: %v", err) + } + publishClient = &PublishClient{ + SeaweedMessaging_PublishClient: stream, + Broker: brokerAddress, + } + if err = publishClient.Send(&mq_pb.PublishRequest{ + Message: &mq_pb.PublishRequest_Init{ + Init: &mq_pb.PublishRequest_InitMessage{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + Partition: &mq_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + AckInterval: 128, + }, + }, + }); err != nil { + return publishClient, fmt.Errorf("send init message: %v", err) + } + resp, err := stream.Recv() + if err != nil { + return publishClient, fmt.Errorf("recv init response: %v", err) + } + if resp.Error != "" { + return publishClient, fmt.Errorf("init response error: %v", resp.Error) + } + + go func() { + for { + _, err := publishClient.Recv() + if err != nil { + e, ok := status.FromError(err) + if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { + return + } + publishClient.Err = err + fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) + return + } + } + }() + return publishClient, nil +} diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index 28cb29015..e55bfd256 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -5,11 +5,28 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) -func (p *TopicPublisher) doLookup(brokerAddress string) error { +func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { + if p.config.CreateTopic { + err := pb.WithBrokerGrpcClient(true, + brokerAddress, + p.grpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + PartitionCount: p.config.CreateTopicPartitionCount, + }) + return err + }) + if err != nil { + return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) + } + } + err := pb.WithBrokerGrpcClient(true, brokerAddress, p.grpcDialOption, @@ -22,21 +39,36 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error { }, IsForPublish: true, }) + if p.config.CreateTopic && err != nil { + _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + PartitionCount: p.config.CreateTopicPartitionCount, + }) + if err != nil { + return err + } + lookupResp, err = client.LookupTopicBrokers(context.Background(), + &mq_pb.LookupTopicBrokersRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + IsForPublish: true, + }) + } if err != nil { return err } + for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { // partition => publishClient - publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) + publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) if err != nil { return err } - for redirectTo != "" { - publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo) - if err != nil { - return err - } - } p.partition2Broker.Insert( brokerPartitionAssignment.Partition.RangeStart, brokerPartitionAssignment.Partition.RangeStop, @@ -50,67 +82,3 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error { } return nil } - -// broker => publish client -// send init message -// save the publishing client -func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) { - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) - if err != nil { - return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err) - } - brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - stream, err := brokerClient.Publish(context.Background()) - if err != nil { - return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err) - } - publishClient = &PublishClient{ - SeaweedMessaging_PublishClient: stream, - Broker: brokerAddress, - } - if err = publishClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Init{ - Init: &mq_pb.PublishRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - }, - AckInterval: 128, - }, - }, - }); err != nil { - return publishClient, redirectTo, fmt.Errorf("send init message: %v", err) - } - resp, err := stream.Recv() - if err != nil { - return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err) - } - if resp.Error != "" { - return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error) - } - if resp.RedirectToBroker != "" { - redirectTo = resp.RedirectToBroker - return publishClient, redirectTo, nil - } - - go func() { - for { - _, err := publishClient.Recv() - if err != nil { - e, ok := status.FromError(err) - if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { - return - } - publishClient.Err = err - fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) - return - } - } - }() - return publishClient, redirectTo, nil -} diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 9495e380c..1e250ede3 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -2,13 +2,13 @@ package pub_client import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/mq/balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" ) func (p *TopicPublisher) Publish(key, value []byte) error { - hashKey := util.HashToInt32(key) % balancer.MaxPartitionCount + hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount if hashKey < 0 { hashKey = -hashKey } diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index bf1711e38..a0c26db36 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -1,8 +1,9 @@ package pub_client import ( + "fmt" "github.com/rdleal/intervalst/interval" - "github.com/seaweedfs/seaweedfs/weed/mq/balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -11,6 +12,8 @@ import ( ) type PublisherConfiguration struct { + CreateTopic bool + CreateTopicPartitionCount int32 } type PublishClient struct { @@ -24,9 +27,10 @@ type TopicPublisher struct { partition2Broker *interval.SearchTree[*PublishClient, int32] grpcDialOption grpc.DialOption sync.Mutex // protects grpc + config *PublisherConfiguration } -func NewTopicPublisher(namespace, topic string) *TopicPublisher { +func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) *TopicPublisher { return &TopicPublisher{ namespace: namespace, topic: topic, @@ -34,19 +38,27 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher { return int(a - b) }), grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + config: config, } } -func (p *TopicPublisher) Connect(bootstrapBroker string) error { - if err := p.doLookup(bootstrapBroker); err != nil { - return err +func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) { + if len(bootstrapBrokers) == 0 { + return nil } - return nil + for _, b := range bootstrapBrokers { + err = p.doLookupAndConnect(b) + if err == nil { + return nil + } + fmt.Printf("failed to connect to %s: %v\n\n", b, err) + } + return err } func (p *TopicPublisher) Shutdown() error { - if clients, found := p.partition2Broker.AllIntersections(0, balancer.MaxPartitionCount); found { + if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found { for _, client := range clients { client.CloseSend() } diff --git a/weed/mq/client/sub_client/process.go b/weed/mq/client/sub_client/process.go index 7717a101f..b6bdb14ee 100644 --- a/weed/mq/client/sub_client/process.go +++ b/weed/mq/client/sub_client/process.go @@ -32,6 +32,9 @@ func (sub *TopicSubscriber) doProcess() error { RangeStop: brokerPartitionAssignment.Partition.RangeStop, }, Filter: sub.ContentConfig.Filter, + Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{ + StartTimestampNs: sub.alreadyProcessedTsNs, + }, }, }, }) @@ -68,6 +71,7 @@ func (sub *TopicSubscriber) doProcess() error { if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { return } + sub.alreadyProcessedTsNs = m.Data.TsNs case *mq_pb.SubscribeResponse_Ctrl: if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { return diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index 0803b2c79..370f5aa3c 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -4,17 +4,30 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/util" "io" + "log" + "time" ) // Subscribe subscribes to a topic's specified partitions. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. func (sub *TopicSubscriber) Subscribe() error { + index := -1 util.RetryUntil("subscribe", func() error { + index++ + index = index % len(sub.bootstrapBrokers) // ask balancer for brokers of the topic - if err := sub.doLookup(sub.bootstrapBroker); err != nil { + if err := sub.doLookup(sub.bootstrapBrokers[index]); err != nil { return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } + if len(sub.brokerPartitionAssignments) == 0 { + if sub.waitForMoreMessage { + time.Sleep(1 * time.Second) + return fmt.Errorf("no broker partition assignments") + } else { + return nil + } + } // treat the first broker as the topic leader // connect to the leader broker @@ -25,6 +38,8 @@ func (sub *TopicSubscriber) Subscribe() error { return nil }, func(err error) bool { if err == io.EOF { + log.Printf("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) + sub.waitForMoreMessage = false return false } return true diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 809673de1..9b96b14cb 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -3,6 +3,7 @@ package sub_client import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" + "time" ) type SubscriberConfiguration struct { @@ -19,6 +20,7 @@ type ContentConfiguration struct { Namespace string Topic string Filter string + StartTime time.Time } type OnEachMessageFunc func(key, value []byte) (shouldContinue bool) @@ -30,14 +32,18 @@ type TopicSubscriber struct { brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment OnEachMessageFunc OnEachMessageFunc OnCompletionFunc OnCompletionFunc - bootstrapBroker string + bootstrapBrokers []string + waitForMoreMessage bool + alreadyProcessedTsNs int64 } -func NewTopicSubscriber(bootstrapBroker string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { +func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { return &TopicSubscriber{ - SubscriberConfig: subscriber, - ContentConfig: content, - bootstrapBroker: bootstrapBroker, + SubscriberConfig: subscriber, + ContentConfig: content, + bootstrapBrokers: bootstrapBrokers, + waitForMoreMessage: true, + alreadyProcessedTsNs: content.StartTime.UnixNano(), } } @@ -45,6 +51,6 @@ func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc sub.OnEachMessageFunc = onEachMessageFn } -func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) { - sub.OnCompletionFunc = onCompeletionFn +func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) { + sub.OnCompletionFunc = onCompletionFn } |
