aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_assign.go9
-rw-r--r--weed/mq/broker/broker_grpc_pub.go135
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go199
-rw-r--r--weed/mq/broker/broker_grpc_sub.go212
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go9
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go31
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go10
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go13
-rw-r--r--weed/mq/client/pub_client/publish.go17
-rw-r--r--weed/mq/client/pub_client/publisher.go1
-rw-r--r--weed/mq/client/pub_client/scheduler.go35
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go21
-rw-r--r--weed/mq/pub_balancer/balance_brokers_test.go6
-rw-r--r--weed/mq/pub_balancer/broker_stats.go28
-rw-r--r--weed/mq/pub_balancer/repair.go9
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go8
-rw-r--r--weed/mq/topic/local_manager.go15
-rw-r--r--weed/mq/topic/local_partition.go161
-rw-r--r--weed/mq/topic/local_partition_publishers.go7
-rw-r--r--weed/mq/topic/local_partition_subscribers.go7
20 files changed, 478 insertions, 455 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 264565b7b..ee69db30d 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -14,7 +14,6 @@ import (
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
ret := &mq_pb.AssignTopicPartitionsResponse{}
- self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
// drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments {
@@ -23,12 +22,12 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
b.accessLock.Lock()
if request.IsDraining {
// TODO drain existing topic partition subscriptions
- b.localTopicManager.RemoveTopicPartition(t, partition)
+ b.localTopicManager.RemoveLocalPartition(t, partition)
} else {
var localPartition *topic.LocalPartition
- if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
- localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
- b.localTopicManager.AddTopicPartition(t, localPartition)
+ if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ b.localTopicManager.AddLocalPartition(t, localPartition)
}
}
b.accessLock.Unlock()
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 3b68db1af..a217489de 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -5,13 +5,13 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
"io"
"math/rand"
"net"
"sync/atomic"
+ "time"
)
// PUB
@@ -40,73 +40,86 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// 2. find the topic metadata owning filer
// 3. write to the filer
- var localTopicPartition *topic.LocalPartition
req, err := stream.Recv()
if err != nil {
return err
}
response := &mq_pb.PublishMessageResponse{}
// TODO check whether current broker should be the leader for the topic partition
- ackInterval := 1
initMessage := req.GetInit()
- var t topic.Topic
- var p topic.Partition
- if initMessage != nil {
- t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p)
- if err != nil {
- response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- return stream.Send(response)
- }
- ackInterval = int(initMessage.AckInterval)
- for _, follower := range initMessage.FollowerBrokers {
- followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{
- Topic: initMessage.Topic,
- Partition: initMessage.Partition,
- BrokerSelf: string(b.option.BrokerAddress()),
- })
- return err
- })
- if followErr != nil {
- response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr)
- glog.Errorf("follower %v failed: %v", follower, followErr)
- return stream.Send(response)
- }
- }
- stream.Send(response)
- } else {
+ if initMessage == nil {
response.Error = fmt.Sprintf("missing init message")
glog.Errorf("missing init message")
return stream.Send(response)
}
+ // get or generate a local partition
+ t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
+ localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
+ if getOrGenErr != nil {
+ response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
+ glog.Errorf("topic %v not found: %v", t, getOrGenErr)
+ return stream.Send(response)
+ }
+
+ // connect to follower brokers
+ if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
+ response.Error = followerErr.Error()
+ glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
+ return stream.Send(response)
+ }
+
+ var receivedSequence, acknowledgedSequence int64
+ var isClosed bool
+
+ // start sending ack to publisher
+ ackInterval := int64(1)
+ if initMessage.AckInterval > 0 {
+ ackInterval = int64(initMessage.AckInterval)
+ }
+ go func() {
+ defer func() {
+ // println("stop sending ack to publisher", initMessage.PublisherName)
+ }()
+
+ lastAckTime := time.Now()
+ for !isClosed {
+ receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
+ if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){
+ acknowledgedSequence = receivedSequence
+ response := &mq_pb.PublishMessageResponse{
+ AckSequence: acknowledgedSequence,
+ }
+ if err := stream.Send(response); err != nil {
+ glog.Errorf("Error sending response %v: %v", response, err)
+ }
+ // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
+ lastAckTime = time.Now()
+ } else {
+ time.Sleep(1 * time.Second)
+ }
+ }
+ }()
+
+
+ // process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
- ackCounter := 0
- var ackSequence int64
- var isStopping int32
- respChan := make(chan *mq_pb.PublishMessageResponse, 128)
defer func() {
- atomic.StoreInt32(&isStopping, 1)
- respChan <- &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- }
- close(respChan)
+ // remove the publisher
localTopicPartition.Publishers.RemovePublisher(clientName)
- glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
if localTopicPartition.MaybeShutdownLocalPartition() {
- b.localTopicManager.RemoveTopicPartition(t, p)
+ b.localTopicManager.RemoveLocalPartition(t, p)
+ glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
}
}()
- go func() {
- for resp := range respChan {
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending response %v: %v", resp, err)
- }
- }
+
+ // send a hello message
+ stream.Send(&mq_pb.PublishMessageResponse{})
+
+ defer func() {
+ isClosed = true
}()
// process each published messages
@@ -117,28 +130,26 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err == io.EOF {
break
}
- glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
- return err
+ glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
+ break
}
// Process the received message
- if dataMessage := req.GetData(); dataMessage != nil {
- localTopicPartition.Publish(dataMessage)
+ dataMessage := req.GetData()
+ if dataMessage == nil {
+ continue
}
- ackCounter++
- ackSequence++
- if ackCounter >= ackInterval {
- ackCounter = 0
- // send back the ack
- response := &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- }
- respChan <- response
+ // The control message should still be sent to the follower
+ // to avoid timing issue when ack messages.
+
+ // send to the local partition
+ if err = localTopicPartition.Publish(dataMessage); err != nil {
+ return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
}
}
- glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
+ glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
return nil
}
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 8ef85110a..d8100f021 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -1,96 +1,151 @@
package broker
import (
- "context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"io"
- "math/rand"
- "sync"
"time"
)
-func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) {
- glog.V(0).Infof("PublishFollowMe %v", request)
- var wg sync.WaitGroup
- wg.Add(1)
- var ret error
- go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
- followerId := rand.Int31()
- subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
- Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
- Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
- ConsumerGroup: string(b.option.BrokerAddress()),
- ConsumerId: fmt.Sprintf("followMe-%d", followerId),
- FollowerId: followerId,
- Topic: request.Topic,
- PartitionOffset: &mq_pb.PartitionOffset{
- Partition: request.Partition,
- StartTsNs: 0,
- StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
- },
- },
- },
- })
+type memBuffer struct {
+ buf []byte
+ startTime time.Time
+ stopTime time.Time
+}
+func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
+ var req *mq_pb.PublishFollowMeRequest
+ req, err = stream.Recv()
+ if err != nil {
+ return err
+ }
+ initMessage := req.GetInit()
+ if initMessage == nil {
+ return fmt.Errorf("missing init message")
+ }
- if err != nil {
- glog.Errorf("FollowInMemoryMessages error: %v", err)
- ret = err
- return err
- }
+ // create an in-memory queue of buffered messages
+ inMemoryBuffers := buffered_queue.NewBufferedQueue[memBuffer](4)
+ logBuffer := b.buildFollowerLogBuffer(inMemoryBuffers)
- // receive first hello message
- resp, err := subscribeClient.Recv()
+ lastFlushTsNs := time.Now().UnixNano()
+
+ // follow each published messages
+ for {
+ // receive a message
+ req, err = stream.Recv()
if err != nil {
- return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err)
- }
- if resp == nil {
- glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response")
- return io.ErrUnexpectedEOF
+ if err == io.EOF {
+ err = nil
+ break
+ }
+ glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
+ break
}
- wg.Done()
- b.doFollowInMemoryMessage(context.Background(), subscribeClient)
+ // Process the received message
+ if dataMessage := req.GetData(); dataMessage != nil {
- return nil
- })
- wg.Wait()
- return &mq_pb.PublishFollowMeResponse{}, ret
-}
+ // TODO: change this to DataMessage
+ // log the message
+ logBuffer.AddToBuffer(dataMessage)
-func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
- for {
- resp, err := client.Recv()
- if err != nil {
- if err != io.EOF {
- glog.V(0).Infof("doFollowInMemoryMessage error: %v", err)
+ // send back the ack
+ if err := stream.Send(&mq_pb.PublishFollowMeResponse{
+ AckTsNs: dataMessage.TsNs,
+ }); err != nil {
+ glog.Errorf("Error sending response %v: %v", dataMessage, err)
+ }
+ // println("ack", string(dataMessage.Key), dataMessage.TsNs)
+ } else if closeMessage := req.GetClose(); closeMessage != nil {
+ glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
+ break
+ } else if flushMessage := req.GetFlush(); flushMessage != nil {
+ glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
+
+ lastFlushTsNs = flushMessage.TsNs
+
+ // drop already flushed messages
+ for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() {
+ if mem.stopTime.UnixNano() <= flushMessage.TsNs {
+ inMemoryBuffers.Dequeue()
+ // println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
+ } else {
+ break
+ }
}
- return
+
+ } else {
+ glog.Errorf("unknown message: %v", req)
}
- if resp == nil {
- glog.V(0).Infof("doFollowInMemoryMessage nil response")
- return
+ }
+
+
+ t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
+
+ logBuffer.ShutdownLogBuffer()
+ // wait until all messages are sent to inMemoryBuffers
+ for !logBuffer.IsAllFlushed() {
+ time.Sleep(113 * time.Millisecond)
+ }
+
+ topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+ partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
+ partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
+
+
+ // flush the remaining messages
+ inMemoryBuffers.CloseInput()
+ for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {
+ if len(mem.buf) == 0 {
+ continue
}
- if resp.Message != nil {
- // process ctrl message or data message
- switch m := resp.Message.(type) {
- case *mq_pb.FollowInMemoryMessagesResponse_Data:
- // process data message
- print("d")
- case *mq_pb.FollowInMemoryMessagesResponse_Ctrl:
- // process ctrl message
- if m.Ctrl.FlushedSequence > 0 {
- flushTime := time.Unix(0, m.Ctrl.FlushedSequence)
- glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime)
- }
- if m.Ctrl.FollowerChangedToId != 0 {
- // follower changed
- glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId)
- return
- }
+
+ startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()
+
+ if stopTime.UnixNano() <= lastFlushTsNs {
+ glog.V(0).Infof("dropping remaining data at %v %v", t, p)
+ continue
+ }
+
+ // TODO trim data earlier than lastFlushTsNs
+
+ targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
+
+ for {
+ if err := b.appendToFile(targetFile, mem.buf); err != nil {
+ glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
+ time.Sleep(737 * time.Millisecond)
+ } else {
+ break
}
}
+
+ glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
}
+
+ glog.V(0).Infof("shut down follower for %v %v", t, p)
+
+ return err
+}
+
+func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_queue.BufferedQueue[memBuffer]) *log_buffer.LogBuffer {
+ lb := log_buffer.NewLogBuffer("follower",
+ 2*time.Minute, func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
+ if len(buf) == 0 {
+ return
+ }
+ inMemoryBuffers.Enqueue(memBuffer{
+ buf: buf,
+ startTime: startTime,
+ stopTime: stopTime,
+ })
+ glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
+ }, nil, func() {
+ })
+ return lb
}
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 1141ff47f..02488b2b0 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -8,7 +8,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "sync/atomic"
"time"
)
@@ -17,40 +16,20 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
ctx := stream.Context()
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
+ initMessage := req.GetInit()
+ if initMessage == nil {
+ glog.Errorf("missing init message")
+ return fmt.Errorf("missing init message")
+ }
+
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
- waitIntervalCount := 0
-
- var localTopicPartition *topic.LocalPartition
- for localTopicPartition == nil {
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
- if err != nil {
- glog.V(1).Infof("topic %v partition %v not setup", t, partition)
- }
- if localTopicPartition != nil {
- break
- }
- waitIntervalCount++
- if waitIntervalCount > 10 {
- waitIntervalCount = 10
- }
- time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return nil
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
- return nil
- default:
- // Continue processing the request
- }
+ localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
+ if getOrGenErr != nil {
+ return getOrGenErr
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
@@ -64,7 +43,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() {
- b.localTopicManager.RemoveTopicPartition(t, partition)
+ b.localTopicManager.RemoveLocalPartition(t, partition)
}
}()
@@ -129,174 +108,3 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer
}
return
}
-
-func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
- ctx := stream.Context()
- clientName := req.GetInit().ConsumerId
-
- t := topic.FromPbTopic(req.GetInit().Topic)
- partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
-
- glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
-
- waitIntervalCount := 0
-
- var localTopicPartition *topic.LocalPartition
- for localTopicPartition == nil {
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
- if err != nil {
- glog.V(1).Infof("topic %v partition %v not setup", t, partition)
- }
- if localTopicPartition != nil {
- break
- }
- waitIntervalCount++
- if waitIntervalCount > 32 {
- waitIntervalCount = 32
- }
- time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return nil
- }
- glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
- return nil
- default:
- // Continue processing the request
- }
- }
-
- // set the current follower id
- followerId := req.GetInit().FollowerId
- atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
-
- glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
- isConnected := true
- sleepIntervalCount := 0
-
- var counter int64
- defer func() {
- isConnected = false
- glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
- }()
-
- // send first hello message
- // to indicate the follower is connected
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
- },
- })
-
- var startPosition log_buffer.MessagePosition
- if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
- startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
- }
-
- var prevFlushTsNs int64
-
- _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
- if !isConnected {
- return false
- }
- sleepIntervalCount++
- if sleepIntervalCount > 32 {
- sleepIntervalCount = 32
- }
- time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
-
- if localTopicPartition.LogBuffer.IsStopping() {
- newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
- glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FollowerChangedToId: newFollowerId,
- },
- },
- })
- return false
- }
-
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return false
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
- return false
- default:
- // Continue processing the request
- }
-
- // send the last flushed sequence
- flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
- if flushTsNs != prevFlushTsNs {
- prevFlushTsNs = flushTsNs
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FlushedSequence: flushTsNs,
- },
- },
- })
- }
-
- return true
- }, func(logEntry *filer_pb.LogEntry) (bool, error) {
- // reset the sleep interval count
- sleepIntervalCount = 0
-
- // check the follower id
- newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
- if newFollowerId != followerId {
- glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FollowerChangedToId: newFollowerId,
- },
- },
- })
- return true, nil
- }
-
- // send the last flushed sequence
- flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
- if flushTsNs != prevFlushTsNs {
- prevFlushTsNs = flushTsNs
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FlushedSequence: flushTsNs,
- },
- },
- })
- }
-
- // send the log entry
- if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
- Data: &mq_pb.DataMessage{
- Key: logEntry.Key,
- Value: logEntry.Data,
- TsNs: logEntry.TsNs,
- },
- }}); err != nil {
- glog.Errorf("Error sending setup response: %v", err)
- return false, err
- }
-
- counter++
- return false, nil
- })
-
- return err
-}
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 89c221af5..3fd97f1c2 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -39,18 +39,11 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
go func() {
// try to load the partition assignment from filer
if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil {
- assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(conf.BrokerPartitionAssignments))
- for i, assignment := range conf.BrokerPartitionAssignments {
- assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
- Partition: assignment.Partition,
- Broker: assignment.LeaderBroker,
- }
- }
// send partition assignment to subscriber
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
- AssignedPartitions: assignedPartitions,
+ PartitionAssignments: conf.BrokerPartitionAssignments,
},
},
}
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 35d95c0e4..cddd6cf1c 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -56,12 +56,27 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil
}
-func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
+func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) {
+ // get or generate a local partition
+ conf, readConfErr := b.readTopicConfFromFiler(t)
+ if readConfErr != nil {
+ glog.Errorf("topic %v not found: %v", t, readConfErr)
+ return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
+ }
+ localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
+ if getOrGenError != nil {
+ glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ }
+ return localTopicPartition, nil
+}
+
+func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
b.accessLock.Lock()
defer b.accessLock.Unlock()
- if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
- localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition)
+ if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
+ localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf)
if err != nil {
return nil, false, err
}
@@ -69,16 +84,12 @@ func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition top
return localPartition, isGenerated, nil
}
-func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
+func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
self := b.option.BrokerAddress()
- conf, err := b.readTopicConfFromFiler(t)
- if err != nil {
- return nil, isGenerated, err
- }
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
- localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
- b.localTopicManager.AddTopicPartition(t, localPartition)
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ b.localTopicManager.AddLocalPartition(t, localPartition)
isGenerated = true
break
}
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index a058d8da5..50470f879 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -41,6 +41,16 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
}
atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
+
+ b.accessLock.Lock()
+ defer b.accessLock.Unlock()
+ p := topic.FromPbPartition(partition)
+ if localPartition:=b.localTopicManager.GetLocalPartition(t, p); localPartition!=nil {
+ localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
+ }
+
+ println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf))
+
}
}
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index 2873ba21f..e9227130a 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -16,6 +16,8 @@ var (
concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count")
+ clientName = flag.String("client", "c1", "client name")
+
namespace = flag.String("ns", "test", "namespace")
t = flag.String("t", "test", "t")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
@@ -25,16 +27,20 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
startTime := time.Now()
for i := 0; i < *messageCount / *concurrency; i++ {
// Simulate publishing a message
- key := []byte(fmt.Sprintf("key-%d-%d", id, i))
- value := []byte(fmt.Sprintf("value-%d-%d", id, i))
+ key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i))
+ value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i))
if err := publisher.Publish(key, value); err != nil {
fmt.Println(err)
break
}
+ time.Sleep(time.Second)
// println("Published", string(key), string(value))
}
+ if err := publisher.FinishPublish(); err != nil {
+ fmt.Println(err)
+ }
elapsed := time.Since(startTime)
- log.Printf("Publisher %d finished in %s", id, elapsed)
+ log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
}
func main() {
@@ -44,6 +50,7 @@ func main() {
CreateTopic: true,
CreateTopicPartitionCount: int32(*partitionCount),
Brokers: strings.Split(*seedBrokers, ","),
+ PublisherName: *clientName,
}
publisher := pub_client.NewTopicPublisher(config)
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 1c5891049..fbb07b042 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "time"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
@@ -20,5 +21,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
return inputBuffer.Enqueue(&mq_pb.DataMessage{
Key: key,
Value: value,
+ TsNs: time.Now().UnixNano(),
})
}
+
+func (p *TopicPublisher) FinishPublish() error {
+ if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
+ for _, inputBuffer := range inputBuffers {
+ inputBuffer.Enqueue(&mq_pb.DataMessage{
+ TsNs: time.Now().UnixNano(),
+ Ctrl: &mq_pb.ControlMessage{
+ IsClose: true,
+ },
+ })
+ }
+ }
+
+ return nil
+}
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 9262d6e0c..09984bae3 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -17,6 +17,7 @@ type PublisherConfiguration struct {
CreateTopic bool
CreateTopicPartitionCount int32
Brokers []string
+ PublisherName string // for debugging
}
type PublishClient struct {
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index e6caf896c..e92e07ab5 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -12,6 +12,7 @@ import (
"log"
"sort"
"sync"
+ "sync/atomic"
"time"
)
@@ -145,11 +146,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
Partition: job.Partition,
AckInterval: 128,
FollowerBrokers: job.FollowerBrokers,
+ PublisherName: p.config.PublisherName,
},
},
}); err != nil {
return fmt.Errorf("send init message: %v", err)
}
+ // process the hello message
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("recv init response: %v", err)
@@ -158,31 +161,44 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("init response error: %v", resp.Error)
}
+ var publishedTsNs int64
+ hasMoreData := int32(1)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
ackResp, err := publishClient.Recv()
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Unknown && e.Message() == "EOF" {
+ log.Printf("publish to %s EOF", publishClient.Broker)
return
}
publishClient.Err = err
- fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
+ log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
return
}
if ackResp.Error != "" {
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
return
}
if ackResp.AckSequence > 0 {
- log.Printf("ack %d", ackResp.AckSequence)
+ log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ }
+ if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ return
}
}
}()
publishCounter := 0
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
+ if data.Ctrl != nil && data.Ctrl.IsClose {
+ // need to set this before sending to brokers, to avoid timing issue
+ atomic.StoreInt32(&hasMoreData, 0)
+ }
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: data,
@@ -191,14 +207,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("send publish data: %v", err)
}
publishCounter++
+ atomic.StoreInt64(&publishedTsNs, data.TsNs)
}
-
- if err := publishClient.CloseSend(); err != nil {
- return fmt.Errorf("close send: %v", err)
+ if publishCounter > 0 {
+ wg.Wait()
+ } else {
+ // CloseSend would cancel the context on the server side
+ if err := publishClient.CloseSend(); err != nil {
+ return fmt.Errorf("close send: %v", err)
+ }
}
- time.Sleep(3 * time.Second)
-
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
return nil
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 4cc3c8ff2..b0b533e42 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -91,26 +91,26 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
var wg sync.WaitGroup
semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
- for _, assigned := range assignment.AssignedPartitions {
+ for _, assigned := range assignment.PartitionAssignments {
wg.Add(1)
semaphore <- struct{}{}
- go func(partition *mq_pb.Partition, broker string) {
+ go func(assigned *mq_pb.BrokerPartitionAssignment) {
defer wg.Done()
defer func() { <-semaphore }()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
- err := sub.onEachPartition(partition, broker)
+ glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ err := sub.onEachPartition(assigned)
if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err)
+ glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
}
- }(assigned.Partition, assigned.Broker)
+ }(assigned)
}
wg.Wait()
}
-func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error {
+func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
// connect to the partition broker
- return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
@@ -118,11 +118,12 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
PartitionOffset: &mq_pb.PartitionOffset{
- Partition: partition,
+ Partition: assigned.Partition,
StartTsNs: sub.alreadyProcessedTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
Filter: sub.ContentConfig.Filter,
+ FollowerBrokers: assigned.FollowerBrokers,
},
},
})
@@ -131,7 +132,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt.Errorf("create subscribe client: %v", err)
}
- glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
+ glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
diff --git a/weed/mq/pub_balancer/balance_brokers_test.go b/weed/mq/pub_balancer/balance_brokers_test.go
index 54667d154..122984f0d 100644
--- a/weed/mq/pub_balancer/balance_brokers_test.go
+++ b/weed/mq/pub_balancer/balance_brokers_test.go
@@ -21,8 +21,6 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024},
},
- ConsumerCount: 1,
- IsLeader: true,
})
broker2Stats := &BrokerStats{
TopicPartitionCount: 2,
@@ -35,16 +33,12 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024},
},
- ConsumerCount: 1,
- IsLeader: true,
})
broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: "topic2", Name: "topic2"},
Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024},
},
- ConsumerCount: 1,
- IsLeader: true,
})
brokers.Set("broker1", broker1Stats)
brokers.Set("broker2", broker2Stats)
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index b4bb28e42..00f1f80ca 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -9,15 +9,16 @@ import (
type BrokerStats struct {
TopicPartitionCount int32
- ConsumerCount int32
+ PublisherCount int32
+ SubscriberCount int32
CpuUsagePercent int32
TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
Topics []topic.Topic
}
type TopicPartitionStats struct {
topic.TopicPartition
- ConsumerCount int32
- IsLeader bool
+ PublisherCount int32
+ SubscriberCount int32
}
func NewBrokerStats() *BrokerStats {
@@ -26,15 +27,15 @@ func NewBrokerStats() *BrokerStats {
}
}
func (bs *BrokerStats) String() string {
- return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
- bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
+ return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}",
+ bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
}
func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
bs.TopicPartitionCount = int32(len(stats.Stats))
bs.CpuUsagePercent = stats.CpuUsagePercent
- var consumerCount int32
+ var publisherCount, subscriberCount int32
currentTopicPartitions := bs.TopicPartitionStats.Items()
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
@@ -47,10 +48,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
},
},
- ConsumerCount: topicPartitionStats.ConsumerCount,
- IsLeader: topicPartitionStats.IsLeader,
+ PublisherCount: topicPartitionStats.PublisherCount,
+ SubscriberCount: topicPartitionStats.SubscriberCount,
}
- consumerCount += topicPartitionStats.ConsumerCount
+ publisherCount += topicPartitionStats.PublisherCount
+ subscriberCount += topicPartitionStats.SubscriberCount
key := tps.TopicPartition.String()
bs.TopicPartitionStats.Set(key, tps)
delete(currentTopicPartitions, key)
@@ -59,8 +61,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for key := range currentTopicPartitions {
bs.TopicPartitionStats.Remove(key)
}
- bs.ConsumerCount = consumerCount
-
+ bs.PublisherCount = publisherCount
+ bs.SubscriberCount = subscriberCount
}
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
@@ -74,8 +76,8 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
UnixTimeNs: partition.UnixTimeNs,
},
},
- ConsumerCount: 0,
- IsLeader: true,
+ PublisherCount: 0,
+ SubscriberCount: 0,
}
key := tps.TopicPartition.String()
if isAdd {
diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go
index 0ab1a5ea9..0f307c9eb 100644
--- a/weed/mq/pub_balancer/repair.go
+++ b/weed/mq/pub_balancer/repair.go
@@ -14,8 +14,7 @@ func (balancer *Balancer) RepairTopics() []BalanceAction {
}
type TopicPartitionInfo struct {
- Leader string
- Followers []string
+ Broker string
}
// RepairMissingTopicPartitions check the stats of all brokers,
@@ -38,11 +37,7 @@ func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStat
tpi = &TopicPartitionInfo{}
topicPartitionToInfo[topicPartitionStat.Partition] = tpi
}
- if topicPartitionStat.IsLeader {
- tpi.Leader = broker
- } else {
- tpi.Followers = append(tpi.Followers, broker)
- }
+ tpi.Broker = broker
}
}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index f897fe2b3..d24a38d8a 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -103,22 +103,22 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
partitionSlots = make([]*PartitionSlotToConsumerInstance, 0)
}
consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots)
- assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots))
+ assignedPartitions := make([]*mq_pb.BrokerPartitionAssignment, len(partitionSlots))
for i, partitionSlot := range partitionSlots {
- assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
+ assignedPartitions[i] = &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{
RangeStop: partitionSlot.RangeStop,
RangeStart: partitionSlot.RangeStart,
RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: partitionSlot.UnixTimeNs,
},
- Broker: partitionSlot.Broker,
+ LeaderBroker: partitionSlot.Broker,
}
}
response := &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
- AssignedPartitions: assignedPartitions,
+ PartitionAssignments: assignedPartitions,
},
},
}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index aa2eefcdc..79a84561c 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -19,8 +19,8 @@ func NewLocalTopicManager() *LocalTopicManager {
}
}
-// AddTopicPartition adds a topic to the local topic manager
-func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
+// AddLocalPartition adds a topic to the local topic manager
+func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) {
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
localTopic = NewLocalTopic(topic)
@@ -34,8 +34,8 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
localTopic.Partitions = append(localTopic.Partitions, localPartition)
}
-// GetTopicPartition gets a topic from the local topic manager
-func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
+// GetLocalPartition gets a topic from the local topic manager
+func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition {
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
return nil
@@ -48,7 +48,7 @@ func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
manager.topics.Remove(topic.String())
}
-func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
+func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool) {
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
return false
@@ -96,8 +96,9 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
Namespace: string(localTopic.Namespace),
Name: localTopic.Name,
},
- Partition: localPartition.Partition.ToPbPartition(),
- ConsumerCount: localPartition.ConsumerCount,
+ Partition: localPartition.Partition.ToPbPartition(),
+ PublisherCount: int32(localPartition.Publishers.Size()),
+ SubscriberCount: int32(localPartition.Subscribers.Size()),
}
// fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition)
}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 798949736..54c122a0f 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -1,42 +1,74 @@
package topic
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
"sync/atomic"
"time"
)
type LocalPartition struct {
+ ListenersWaits int64
+ AckTsNs int64
+
+ // notifying clients
+ ListenersLock sync.Mutex
+ ListenersCond *sync.Cond
+
Partition
- isLeader bool
- FollowerBrokers []pb.ServerAddress
LogBuffer *log_buffer.LogBuffer
- ConsumerCount int32
Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers
- FollowerId int32
+
+ followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
+ followerGrpcConnection *grpc.ClientConn
+ follower string
}
var TIME_FORMAT = "2006-01-02-15-04-05"
-func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
- return &LocalPartition{
+func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
+ lp := &LocalPartition{
Partition: partition,
- isLeader: isLeader,
- FollowerBrokers: followerBrokers,
- LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
- 2*time.Minute, logFlushFn, readFromDiskFn, func() {}),
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
}
+ lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
+ lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
+ 2*time.Minute, logFlushFn, readFromDiskFn, func() {
+ if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
+ lp.ListenersCond.Broadcast()
+ }
+ })
+ return lp
}
-func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {
- p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
+func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
+ p.LogBuffer.AddToBuffer(message)
+
+ // maybe send to the follower
+ if p.followerStream != nil {
+ // println("recv", string(message.Key), message.TsNs)
+ if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Data{
+ Data: message,
+ },
+ }); followErr != nil {
+ return fmt.Errorf("send to follower %s: %v", p.follower, followErr)
+ }
+ } else {
+ atomic.StoreInt64(&p.AckTsNs, message.TsNs)
+ }
+
+ return nil
}
func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
@@ -85,15 +117,6 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message
return p.LogBuffer.GetEarliestPosition()
}
-func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
- isLeader := assignment.LeaderBroker == string(self)
- followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
- for i, followerBroker := range assignment.FollowerBrokers {
- followers[i] = pb.ServerAddress(followerBroker)
- }
- return NewLocalPartition(partition, isLeader, followers, logFlushFn, readFromDiskFn)
-}
-
func (p *LocalPartition) closePublishers() {
p.Publishers.SignalShutdown()
}
@@ -103,18 +126,93 @@ func (p *LocalPartition) closeSubscribers() {
func (p *LocalPartition) WaitUntilNoPublishers() {
for {
- if p.Publishers.IsEmpty() {
+ if p.Publishers.Size() == 0 {
return
}
time.Sleep(113 * time.Millisecond)
}
}
+func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
+ if p.followerStream != nil {
+ return nil
+ }
+ if len(initMessage.FollowerBrokers) == 0 {
+ return nil
+ }
+
+ p.follower = initMessage.FollowerBrokers[0]
+ ctx := context.Background()
+ p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", p.follower, err)
+ }
+ followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
+ p.followerStream, err = followerClient.PublishFollowMe(ctx)
+ if err != nil {
+ return fmt.Errorf("fail to create publish client: %v", err)
+ }
+ if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Init{
+ Init: &mq_pb.PublishFollowMeRequest_InitMessage{
+ Topic: initMessage.Topic,
+ Partition: initMessage.Partition,
+ },
+ },
+ }); err != nil {
+ return err
+ }
+
+ // start receiving ack from follower
+ go func() {
+ defer func() {
+ // println("stop receiving ack from follower")
+ }()
+
+ for {
+ ack, err := p.followerStream.Recv()
+ if err != nil {
+ e, _ := status.FromError(err)
+ if e.Code() == codes.Canceled {
+ glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.follower)
+ return
+ }
+ glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.follower, err)
+ return
+ }
+ atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
+ // println("recv ack", ack.AckTsNs)
+ }
+ }()
+ return nil
+}
+
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
- if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() {
+
+ if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 {
p.LogBuffer.ShutdownLogBuffer()
+ for !p.LogBuffer.IsAllFlushed() {
+ time.Sleep(113 * time.Millisecond)
+ }
+ if p.followerStream != nil {
+ // send close to the follower
+ if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Close{
+ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
+ },
+ }); followErr != nil {
+ glog.Errorf("Error closing follower stream: %v", followErr)
+ }
+ glog.V(4).Infof("closing grpcConnection to follower")
+ p.followerGrpcConnection.Close()
+ p.followerStream = nil
+ p.follower = ""
+ }
+
hasShutdown = true
}
+
+ glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.follower, hasShutdown)
return
}
@@ -122,5 +220,20 @@ func (p *LocalPartition) Shutdown() {
p.closePublishers()
p.closeSubscribers()
p.LogBuffer.ShutdownLogBuffer()
- atomic.StoreInt32(&p.FollowerId, 0)
+ glog.V(0).Infof("local partition %v shutting down", p.Partition)
+}
+
+func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
+ if p.followerStream != nil {
+ if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Flush{
+ Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
+ TsNs: flushTsNs,
+ },
+ },
+ }); followErr != nil {
+ glog.Errorf("send follower %s flush message: %v", p.follower, followErr)
+ }
+ // println("notifying", p.follower, "flushed at", flushTsNs)
+ }
}
diff --git a/weed/mq/topic/local_partition_publishers.go b/weed/mq/topic/local_partition_publishers.go
index c12f66336..e3c4e3ca6 100644
--- a/weed/mq/topic/local_partition_publishers.go
+++ b/weed/mq/topic/local_partition_publishers.go
@@ -44,13 +44,6 @@ func (p *LocalPartitionPublishers) SignalShutdown() {
}
}
-func (p *LocalPartitionPublishers) IsEmpty() bool {
- p.publishersLock.RLock()
- defer p.publishersLock.RUnlock()
-
- return len(p.publishers) == 0
-}
-
func (p *LocalPartitionPublishers) Size() int {
p.publishersLock.RLock()
defer p.publishersLock.RUnlock()
diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go
index d3b989d72..24341ce7e 100644
--- a/weed/mq/topic/local_partition_subscribers.go
+++ b/weed/mq/topic/local_partition_subscribers.go
@@ -48,13 +48,6 @@ func (p *LocalPartitionSubscribers) SignalShutdown() {
}
}
-func (p *LocalPartitionSubscribers) IsEmpty() bool {
- p.SubscribersLock.RLock()
- defer p.SubscribersLock.RUnlock()
-
- return len(p.Subscribers) == 0
-}
-
func (p *LocalPartitionSubscribers) Size() int {
p.SubscribersLock.RLock()
defer p.SubscribersLock.RUnlock()