aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go1
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go5
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go14
-rw-r--r--weed/mq/client/pub_client/lookup.go86
-rw-r--r--weed/mq/client/pub_client/publish.go32
-rw-r--r--weed/mq/client/pub_client/publisher.go25
-rw-r--r--weed/mq/client/pub_client/scheduler.go74
7 files changed, 103 insertions, 134 deletions
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 4ba1a0f75..6f0485c0c 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -32,7 +32,6 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
ret.Topic = request.Topic
if conf, err = b.readTopicConfFromFiler(t); err != nil {
glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
- ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
} else {
err = b.ensureTopicActiveAssignments(t, conf)
}
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 0eeefbdf0..c5d8e3b78 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -37,8 +37,11 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
+ if err == filer_pb.ErrNotFound {
+ return err
+ }
if err != nil {
- return fmt.Errorf("read topic %v partition %v conf: %v", t, err)
+ return fmt.Errorf("read topic.conf of %v: %v", t, err)
}
// parse into filer conf object
conf = &mq_pb.ConfigureTopicResponse{}
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index 59469e66b..3ac037973 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -30,7 +30,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
fmt.Println(err)
break
}
- // println("Published", string(key), string(value))
+ println("Published", string(key), string(value))
}
elapsed := time.Since(startTime)
log.Printf("Publisher %d finished in %s", id, elapsed)
@@ -43,11 +43,13 @@ func main() {
CreateTopicPartitionCount: int32(*partitionCount),
}
publisher := pub_client.NewTopicPublisher(*namespace, *topic, config)
- brokers := strings.Split(*seedBrokers, ",")
- if err := publisher.Connect(brokers); err != nil {
- fmt.Println(err)
- return
- }
+ go func() {
+ brokers := strings.Split(*seedBrokers, ",")
+ if err := publisher.StartSchedulerThread(brokers); err != nil {
+ fmt.Println(err)
+ return
+ }
+ }()
startTime := time.Now()
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go
deleted file mode 100644
index ce65bbc92..000000000
--- a/weed/mq/client/pub_client/lookup.go
+++ /dev/null
@@ -1,86 +0,0 @@
-package pub_client
-
-import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
-)
-
-func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
- if p.config.CreateTopic {
- err := pb.WithBrokerGrpcClient(true,
- brokerAddress,
- p.grpcDialOption,
- func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- PartitionCount: p.config.CreateTopicPartitionCount,
- })
- return err
- })
- if err != nil {
- return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
- }
- }
-
- err := pb.WithBrokerGrpcClient(true,
- brokerAddress,
- p.grpcDialOption,
- func(client mq_pb.SeaweedMessagingClient) error {
- lookupResp, err := client.LookupTopicBrokers(context.Background(),
- &mq_pb.LookupTopicBrokersRequest{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- })
- glog.V(0).Infof("lookup1 topic %s/%s: %v", p.namespace, p.topic, lookupResp)
- if p.config.CreateTopic && err != nil {
- _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- PartitionCount: p.config.CreateTopicPartitionCount,
- })
- if err != nil {
- return err
- }
- lookupResp, err = client.LookupTopicBrokers(context.Background(),
- &mq_pb.LookupTopicBrokersRequest{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- })
- glog.V(0).Infof("lookup2 topic %s/%s: %v", p.namespace, p.topic, lookupResp)
- }
- if err != nil {
- return err
- }
-
- for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
- glog.V(0).Infof("topic %s/%s partition %v leader %s followers %v", p.namespace, p.topic, brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker, brokerPartitionAssignment.FollowerBrokers)
- // partition => publishClient
- publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
- if err != nil {
- return err
- }
- p.partition2Broker.Insert(
- brokerPartitionAssignment.Partition.RangeStart,
- brokerPartitionAssignment.Partition.RangeStop,
- publishClient)
- }
- return nil
- })
-
- if err != nil {
- return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
- }
- return nil
-}
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 4b0dfade9..3b9817e74 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -7,35 +7,19 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
+
func (p *TopicPublisher) Publish(key, value []byte) error {
hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {
hashKey = -hashKey
}
- publishClient, found := p.partition2Broker.Floor(hashKey+1, hashKey+1)
+ inputBuffer, found := p.partition2Buffer.Floor(hashKey+1, hashKey+1)
if !found {
- return fmt.Errorf("no broker found for key %d", hashKey)
- }
- p.Lock()
- defer p.Unlock()
- // dead lock here
- //google.golang.org/grpc/internal/transport.(*writeQuota).get(flowcontrol.go:59)
- //google.golang.org/grpc/internal/transport.(*http2Client).Write(http2_client.go:1047)
- //google.golang.org/grpc.(*csAttempt).sendMsg(stream.go:1040)
- //google.golang.org/grpc.(*clientStream).SendMsg.func2(stream.go:892)
- //google.golang.org/grpc.(*clientStream).withRetry(stream.go:752)
- //google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894)
- //github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141)
- //github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19)
- if err := publishClient.Send(&mq_pb.PublishMessageRequest{
- Message: &mq_pb.PublishMessageRequest_Data{
- Data: &mq_pb.DataMessage{
- Key: key,
- Value: value,
- },
- },
- }); err != nil {
- return fmt.Errorf("send publish request: %v", err)
+ return fmt.Errorf("no input buffer found for key %d", hashKey)
}
- return nil
+
+ return inputBuffer.Enqueue(&mq_pb.DataMessage{
+ Key: key,
+ Value: value,
+ })
}
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 7dd3ab4d1..be29efa1c 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -1,10 +1,10 @@
package pub_client
import (
- "fmt"
"github.com/rdleal/intervalst/interval"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
@@ -25,6 +25,7 @@ type TopicPublisher struct {
namespace string
topic string
partition2Broker *interval.SearchTree[*PublishClient, int32]
+ partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32]
grpcDialOption grpc.DialOption
sync.Mutex // protects grpc
config *PublisherConfiguration
@@ -38,25 +39,14 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration)
partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int {
return int(a - b)
}),
+ partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int {
+ return int(a - b)
+ }),
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
config: config,
}
}
-func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) {
- if len(bootstrapBrokers) == 0 {
- return nil
- }
- for _, b := range bootstrapBrokers {
- err = p.doLookupAndConnect(b)
- if err == nil {
- return nil
- }
- fmt.Printf("failed to connect to %s: %v\n\n", b, err)
- }
- return err
-}
-
func (p *TopicPublisher) Shutdown() error {
if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
@@ -64,6 +54,11 @@ func (p *TopicPublisher) Shutdown() error {
client.CloseSend()
}
}
+ if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
+ for _, inputBuffer := range inputBuffers {
+ inputBuffer.CloseInput()
+ }
+ }
time.Sleep(1100 * time.Millisecond)
return nil
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index 226bc7272..e617af09f 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -6,6 +6,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "log"
"sort"
"sync"
"time"
@@ -22,13 +26,16 @@ type EachPartitionPublishJob struct {
stopChan chan bool
wg sync.WaitGroup
generation int
+ inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
}
func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
- return err
+ return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
}
+ log.Printf("start scheduler thread for topic %s/%s", p.namespace, p.topic)
+
generation := 0
var errChan chan EachPartitionError
for {
@@ -92,6 +99,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
BrokerPartitionAssignment: assignment,
stopChan: make(chan bool, 1),
generation: generation,
+ inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024, true),
}
job.wg.Add(1)
go func(job *EachPartitionPublishJob) {
@@ -101,12 +109,76 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
}
}(job)
jobs = append(jobs, job)
+ // TODO assuming this is not re-configured so the partitions are fixed.
+ // better just re-use the existing job
+ p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
}
p.jobs = jobs
}
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
+ log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
+
+ grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
+ if err != nil {
+ return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ stream, err := brokerClient.PublishMessage(context.Background())
+ if err != nil {
+ return fmt.Errorf("create publish client: %v", err)
+ }
+ publishClient := &PublishClient{
+ SeaweedMessaging_PublishMessageClient: stream,
+ Broker: job.LeaderBroker,
+ }
+ if err = publishClient.Send(&mq_pb.PublishMessageRequest{
+ Message: &mq_pb.PublishMessageRequest_Init{
+ Init: &mq_pb.PublishMessageRequest_InitMessage{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ Partition: job.Partition,
+ AckInterval: 128,
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("send init message: %v", err)
+ }
+ resp, err := stream.Recv()
+ if err != nil {
+ return fmt.Errorf("recv init response: %v", err)
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("init response error: %v", resp.Error)
+ }
+
+ go func() {
+ for {
+ _, err := publishClient.Recv()
+ if err != nil {
+ e, ok := status.FromError(err)
+ if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
+ return
+ }
+ publishClient.Err = err
+ fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
+ return
+ }
+ }
+ }()
+
+ for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
+ if err := publishClient.Send(&mq_pb.PublishMessageRequest{
+ Message: &mq_pb.PublishMessageRequest_Data{
+ Data: data,
+ },
+ }); err != nil {
+ return fmt.Errorf("send publish data: %v", err)
+ }
+ }
return nil
}