diff options
Diffstat (limited to 'weed/mq')
20 files changed, 478 insertions, 455 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 264565b7b..ee69db30d 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -14,7 +14,6 @@ import ( // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { ret := &mq_pb.AssignTopicPartitionsResponse{} - self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port)) // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { @@ -23,12 +22,12 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m b.accessLock.Lock() if request.IsDraining { // TODO drain existing topic partition subscriptions - b.localTopicManager.RemoveTopicPartition(t, partition) + b.localTopicManager.RemoveLocalPartition(t, partition) } else { var localPartition *topic.LocalPartition - if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { - localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) - b.localTopicManager.AddTopicPartition(t, localPartition) + if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + b.localTopicManager.AddLocalPartition(t, localPartition) } } b.accessLock.Unlock() diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 3b68db1af..a217489de 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -5,13 +5,13 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" "io" "math/rand" "net" "sync/atomic" + "time" ) // PUB @@ -40,73 +40,86 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // 2. find the topic metadata owning filer // 3. write to the filer - var localTopicPartition *topic.LocalPartition req, err := stream.Recv() if err != nil { return err } response := &mq_pb.PublishMessageResponse{} // TODO check whether current broker should be the leader for the topic partition - ackInterval := 1 initMessage := req.GetInit() - var t topic.Topic - var p topic.Partition - if initMessage != nil { - t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p) - if err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - return stream.Send(response) - } - ackInterval = int(initMessage.AckInterval) - for _, follower := range initMessage.FollowerBrokers { - followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{ - Topic: initMessage.Topic, - Partition: initMessage.Partition, - BrokerSelf: string(b.option.BrokerAddress()), - }) - return err - }) - if followErr != nil { - response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr) - glog.Errorf("follower %v failed: %v", follower, followErr) - return stream.Send(response) - } - } - stream.Send(response) - } else { + if initMessage == nil { response.Error = fmt.Sprintf("missing init message") glog.Errorf("missing init message") return stream.Send(response) } + // get or generate a local partition + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p) + if getOrGenErr != nil { + response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr) + glog.Errorf("topic %v not found: %v", t, getOrGenErr) + return stream.Send(response) + } + + // connect to follower brokers + if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil { + response.Error = followerErr.Error() + glog.Errorf("MaybeConnectToFollowers: %v", followerErr) + return stream.Send(response) + } + + var receivedSequence, acknowledgedSequence int64 + var isClosed bool + + // start sending ack to publisher + ackInterval := int64(1) + if initMessage.AckInterval > 0 { + ackInterval = int64(initMessage.AckInterval) + } + go func() { + defer func() { + // println("stop sending ack to publisher", initMessage.PublisherName) + }() + + lastAckTime := time.Now() + for !isClosed { + receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs) + if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){ + acknowledgedSequence = receivedSequence + response := &mq_pb.PublishMessageResponse{ + AckSequence: acknowledgedSequence, + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending response %v: %v", response, err) + } + // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) + lastAckTime = time.Now() + } else { + time.Sleep(1 * time.Second) + } + } + }() + + + // process each published messages clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) - ackCounter := 0 - var ackSequence int64 - var isStopping int32 - respChan := make(chan *mq_pb.PublishMessageResponse, 128) defer func() { - atomic.StoreInt32(&isStopping, 1) - respChan <- &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - } - close(respChan) + // remove the publisher localTopicPartition.Publishers.RemovePublisher(clientName) - glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, p) + b.localTopicManager.RemoveLocalPartition(t, p) + glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } }() - go func() { - for resp := range respChan { - if err := stream.Send(resp); err != nil { - glog.Errorf("Error sending response %v: %v", resp, err) - } - } + + // send a hello message + stream.Send(&mq_pb.PublishMessageResponse{}) + + defer func() { + isClosed = true }() // process each published messages @@ -117,28 +130,26 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) - return err + glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + break } // Process the received message - if dataMessage := req.GetData(); dataMessage != nil { - localTopicPartition.Publish(dataMessage) + dataMessage := req.GetData() + if dataMessage == nil { + continue } - ackCounter++ - ackSequence++ - if ackCounter >= ackInterval { - ackCounter = 0 - // send back the ack - response := &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - } - respChan <- response + // The control message should still be sent to the follower + // to avoid timing issue when ack messages. + + // send to the local partition + if err = localTopicPartition.Publish(dataMessage); err != nil { + return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err) } } - glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition) + glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 8ef85110a..d8100f021 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -1,96 +1,151 @@ package broker import ( - "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "io" - "math/rand" - "sync" "time" ) -func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) { - glog.V(0).Infof("PublishFollowMe %v", request) - var wg sync.WaitGroup - wg.Add(1) - var ret error - go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error { - followerId := rand.Int31() - subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{ - Message: &mq_pb.FollowInMemoryMessagesRequest_Init{ - Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{ - ConsumerGroup: string(b.option.BrokerAddress()), - ConsumerId: fmt.Sprintf("followMe-%d", followerId), - FollowerId: followerId, - Topic: request.Topic, - PartitionOffset: &mq_pb.PartitionOffset{ - Partition: request.Partition, - StartTsNs: 0, - StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, - }, - }, - }, - }) +type memBuffer struct { + buf []byte + startTime time.Time + stopTime time.Time +} +func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) { + var req *mq_pb.PublishFollowMeRequest + req, err = stream.Recv() + if err != nil { + return err + } + initMessage := req.GetInit() + if initMessage == nil { + return fmt.Errorf("missing init message") + } - if err != nil { - glog.Errorf("FollowInMemoryMessages error: %v", err) - ret = err - return err - } + // create an in-memory queue of buffered messages + inMemoryBuffers := buffered_queue.NewBufferedQueue[memBuffer](4) + logBuffer := b.buildFollowerLogBuffer(inMemoryBuffers) - // receive first hello message - resp, err := subscribeClient.Recv() + lastFlushTsNs := time.Now().UnixNano() + + // follow each published messages + for { + // receive a message + req, err = stream.Recv() if err != nil { - return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err) - } - if resp == nil { - glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response") - return io.ErrUnexpectedEOF + if err == io.EOF { + err = nil + break + } + glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + break } - wg.Done() - b.doFollowInMemoryMessage(context.Background(), subscribeClient) + // Process the received message + if dataMessage := req.GetData(); dataMessage != nil { - return nil - }) - wg.Wait() - return &mq_pb.PublishFollowMeResponse{}, ret -} + // TODO: change this to DataMessage + // log the message + logBuffer.AddToBuffer(dataMessage) -func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { - for { - resp, err := client.Recv() - if err != nil { - if err != io.EOF { - glog.V(0).Infof("doFollowInMemoryMessage error: %v", err) + // send back the ack + if err := stream.Send(&mq_pb.PublishFollowMeResponse{ + AckTsNs: dataMessage.TsNs, + }); err != nil { + glog.Errorf("Error sending response %v: %v", dataMessage, err) + } + // println("ack", string(dataMessage.Key), dataMessage.TsNs) + } else if closeMessage := req.GetClose(); closeMessage != nil { + glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + break + } else if flushMessage := req.GetFlush(); flushMessage != nil { + glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) + + lastFlushTsNs = flushMessage.TsNs + + // drop already flushed messages + for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() { + if mem.stopTime.UnixNano() <= flushMessage.TsNs { + inMemoryBuffers.Dequeue() + // println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf)) + } else { + break + } } - return + + } else { + glog.Errorf("unknown message: %v", req) } - if resp == nil { - glog.V(0).Infof("doFollowInMemoryMessage nil response") - return + } + + + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + + logBuffer.ShutdownLogBuffer() + // wait until all messages are sent to inMemoryBuffers + for !logBuffer.IsAllFlushed() { + time.Sleep(113 * time.Millisecond) + } + + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) + partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop) + + + // flush the remaining messages + inMemoryBuffers.CloseInput() + for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() { + if len(mem.buf) == 0 { + continue } - if resp.Message != nil { - // process ctrl message or data message - switch m := resp.Message.(type) { - case *mq_pb.FollowInMemoryMessagesResponse_Data: - // process data message - print("d") - case *mq_pb.FollowInMemoryMessagesResponse_Ctrl: - // process ctrl message - if m.Ctrl.FlushedSequence > 0 { - flushTime := time.Unix(0, m.Ctrl.FlushedSequence) - glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime) - } - if m.Ctrl.FollowerChangedToId != 0 { - // follower changed - glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId) - return - } + + startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC() + + if stopTime.UnixNano() <= lastFlushTsNs { + glog.V(0).Infof("dropping remaining data at %v %v", t, p) + continue + } + + // TODO trim data earlier than lastFlushTsNs + + targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) + + for { + if err := b.appendToFile(targetFile, mem.buf); err != nil { + glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) + time.Sleep(737 * time.Millisecond) + } else { + break } } + + glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf)) } + + glog.V(0).Infof("shut down follower for %v %v", t, p) + + return err +} + +func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_queue.BufferedQueue[memBuffer]) *log_buffer.LogBuffer { + lb := log_buffer.NewLogBuffer("follower", + 2*time.Minute, func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) { + if len(buf) == 0 { + return + } + inMemoryBuffers.Enqueue(memBuffer{ + buf: buf, + startTime: startTime, + stopTime: stopTime, + }) + glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf)) + }, nil, func() { + }) + return lb } diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 1141ff47f..02488b2b0 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -8,7 +8,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "sync/atomic" "time" ) @@ -17,40 +16,20 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest ctx := stream.Context() clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) + initMessage := req.GetInit() + if initMessage == nil { + glog.Errorf("missing init message") + return fmt.Errorf("missing init message") + } + t := topic.FromPbTopic(req.GetInit().Topic) partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) - waitIntervalCount := 0 - - var localTopicPartition *topic.LocalPartition - for localTopicPartition == nil { - localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition) - if err != nil { - glog.V(1).Infof("topic %v partition %v not setup", t, partition) - } - if localTopicPartition != nil { - break - } - waitIntervalCount++ - if waitIntervalCount > 10 { - waitIntervalCount = 10 - } - time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond) - // Check if the client has disconnected by monitoring the context - select { - case <-ctx.Done(): - err := ctx.Err() - if err == context.Canceled { - // Client disconnected - return nil - } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) - return nil - default: - // Continue processing the request - } + localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) + if getOrGenErr != nil { + return getOrGenErr } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) @@ -64,7 +43,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest localTopicPartition.Subscribers.RemoveSubscriber(clientName) glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, partition) + b.localTopicManager.RemoveLocalPartition(t, partition) } }() @@ -129,174 +108,3 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer } return } - -func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { - ctx := stream.Context() - clientName := req.GetInit().ConsumerId - - t := topic.FromPbTopic(req.GetInit().Topic) - partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) - - glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition) - - waitIntervalCount := 0 - - var localTopicPartition *topic.LocalPartition - for localTopicPartition == nil { - localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition) - if err != nil { - glog.V(1).Infof("topic %v partition %v not setup", t, partition) - } - if localTopicPartition != nil { - break - } - waitIntervalCount++ - if waitIntervalCount > 32 { - waitIntervalCount = 32 - } - time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond) - // Check if the client has disconnected by monitoring the context - select { - case <-ctx.Done(): - err := ctx.Err() - if err == context.Canceled { - // Client disconnected - return nil - } - glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err) - return nil - default: - // Continue processing the request - } - } - - // set the current follower id - followerId := req.GetInit().FollowerId - atomic.StoreInt32(&localTopicPartition.FollowerId, followerId) - - glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition) - isConnected := true - sleepIntervalCount := 0 - - var counter int64 - defer func() { - isConnected = false - glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter) - }() - - // send first hello message - // to indicate the follower is connected - stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ - Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{}, - }, - }) - - var startPosition log_buffer.MessagePosition - if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { - startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) - } - - var prevFlushTsNs int64 - - _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { - if !isConnected { - return false - } - sleepIntervalCount++ - if sleepIntervalCount > 32 { - sleepIntervalCount = 32 - } - time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond) - - if localTopicPartition.LogBuffer.IsStopping() { - newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) - glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId) - stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ - Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ - FollowerChangedToId: newFollowerId, - }, - }, - }) - return false - } - - // Check if the client has disconnected by monitoring the context - select { - case <-ctx.Done(): - err := ctx.Err() - if err == context.Canceled { - // Client disconnected - return false - } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) - return false - default: - // Continue processing the request - } - - // send the last flushed sequence - flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs) - if flushTsNs != prevFlushTsNs { - prevFlushTsNs = flushTsNs - stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ - Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ - FlushedSequence: flushTsNs, - }, - }, - }) - } - - return true - }, func(logEntry *filer_pb.LogEntry) (bool, error) { - // reset the sleep interval count - sleepIntervalCount = 0 - - // check the follower id - newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) - if newFollowerId != followerId { - glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId) - stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ - Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ - FollowerChangedToId: newFollowerId, - }, - }, - }) - return true, nil - } - - // send the last flushed sequence - flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs) - if flushTsNs != prevFlushTsNs { - prevFlushTsNs = flushTsNs - stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ - Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ - FlushedSequence: flushTsNs, - }, - }, - }) - } - - // send the log entry - if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ - Message: &mq_pb.FollowInMemoryMessagesResponse_Data{ - Data: &mq_pb.DataMessage{ - Key: logEntry.Key, - Value: logEntry.Data, - TsNs: logEntry.TsNs, - }, - }}); err != nil { - glog.Errorf("Error sending setup response: %v", err) - return false, err - } - - counter++ - return false, nil - }) - - return err -} diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 89c221af5..3fd97f1c2 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -39,18 +39,11 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess go func() { // try to load the partition assignment from filer if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil { - assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(conf.BrokerPartitionAssignments)) - for i, assignment := range conf.BrokerPartitionAssignments { - assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ - Partition: assignment.Partition, - Broker: assignment.LeaderBroker, - } - } // send partition assignment to subscriber cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{ Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{ - AssignedPartitions: assignedPartitions, + PartitionAssignments: conf.BrokerPartitionAssignments, }, }, } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 35d95c0e4..cddd6cf1c 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -56,12 +56,27 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } -func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) { +func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { + // get or generate a local partition + conf, readConfErr := b.readTopicConfFromFiler(t) + if readConfErr != nil { + glog.Errorf("topic %v not found: %v", t, readConfErr) + return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) + } + localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) + if getOrGenError != nil { + glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + } + return localTopicPartition, nil +} + +func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { b.accessLock.Lock() defer b.accessLock.Unlock() - if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { - localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition) + if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { + localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf) if err != nil { return nil, false, err } @@ -69,16 +84,12 @@ func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition top return localPartition, isGenerated, nil } -func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) { +func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { self := b.option.BrokerAddress() - conf, err := b.readTopicConfFromFiler(t) - if err != nil { - return nil, isGenerated, err - } for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { - localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) - b.localTopicManager.AddTopicPartition(t, localPartition) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + b.localTopicManager.AddLocalPartition(t, localPartition) isGenerated = true break } diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index a058d8da5..50470f879 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -41,6 +41,16 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par } atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano()) + + b.accessLock.Lock() + defer b.accessLock.Unlock() + p := topic.FromPbPartition(partition) + if localPartition:=b.localTopicManager.GetLocalPartition(t, p); localPartition!=nil { + localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) + } + + println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf)) + } } diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 2873ba21f..e9227130a 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -16,6 +16,8 @@ var ( concurrency = flag.Int("c", 4, "concurrent publishers") partitionCount = flag.Int("p", 6, "partition count") + clientName = flag.String("client", "c1", "client name") + namespace = flag.String("ns", "test", "namespace") t = flag.String("t", "test", "t") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") @@ -25,16 +27,20 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { startTime := time.Now() for i := 0; i < *messageCount / *concurrency; i++ { // Simulate publishing a message - key := []byte(fmt.Sprintf("key-%d-%d", id, i)) - value := []byte(fmt.Sprintf("value-%d-%d", id, i)) + key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i)) + value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i)) if err := publisher.Publish(key, value); err != nil { fmt.Println(err) break } + time.Sleep(time.Second) // println("Published", string(key), string(value)) } + if err := publisher.FinishPublish(); err != nil { + fmt.Println(err) + } elapsed := time.Since(startTime) - log.Printf("Publisher %d finished in %s", id, elapsed) + log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) } func main() { @@ -44,6 +50,7 @@ func main() { CreateTopic: true, CreateTopicPartitionCount: int32(*partitionCount), Brokers: strings.Split(*seedBrokers, ","), + PublisherName: *clientName, } publisher := pub_client.NewTopicPublisher(config) diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 1c5891049..fbb07b042 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" + "time" ) func (p *TopicPublisher) Publish(key, value []byte) error { @@ -20,5 +21,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error { return inputBuffer.Enqueue(&mq_pb.DataMessage{ Key: key, Value: value, + TsNs: time.Now().UnixNano(), }) } + +func (p *TopicPublisher) FinishPublish() error { + if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { + for _, inputBuffer := range inputBuffers { + inputBuffer.Enqueue(&mq_pb.DataMessage{ + TsNs: time.Now().UnixNano(), + Ctrl: &mq_pb.ControlMessage{ + IsClose: true, + }, + }) + } + } + + return nil +} diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 9262d6e0c..09984bae3 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -17,6 +17,7 @@ type PublisherConfiguration struct { CreateTopic bool CreateTopicPartitionCount int32 Brokers []string + PublisherName string // for debugging } type PublishClient struct { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e6caf896c..e92e07ab5 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -12,6 +12,7 @@ import ( "log" "sort" "sync" + "sync/atomic" "time" ) @@ -145,11 +146,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro Partition: job.Partition, AckInterval: 128, FollowerBrokers: job.FollowerBrokers, + PublisherName: p.config.PublisherName, }, }, }); err != nil { return fmt.Errorf("send init message: %v", err) } + // process the hello message resp, err := stream.Recv() if err != nil { return fmt.Errorf("recv init response: %v", err) @@ -158,31 +161,44 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("init response error: %v", resp.Error) } + var publishedTsNs int64 + hasMoreData := int32(1) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for { ackResp, err := publishClient.Recv() if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { + log.Printf("publish to %s EOF", publishClient.Broker) return } publishClient.Err = err - fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) - fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { - log.Printf("ack %d", ackResp.AckSequence) + log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + } + if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + return } } }() publishCounter := 0 for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { + if data.Ctrl != nil && data.Ctrl.IsClose { + // need to set this before sending to brokers, to avoid timing issue + atomic.StoreInt32(&hasMoreData, 0) + } if err := publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Data{ Data: data, @@ -191,14 +207,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("send publish data: %v", err) } publishCounter++ + atomic.StoreInt64(&publishedTsNs, data.TsNs) } - - if err := publishClient.CloseSend(); err != nil { - return fmt.Errorf("close send: %v", err) + if publishCounter > 0 { + wg.Wait() + } else { + // CloseSend would cancel the context on the server side + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) + } } - time.Sleep(3 * time.Second) - log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) return nil diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 4cc3c8ff2..b0b533e42 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -91,26 +91,26 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo var wg sync.WaitGroup semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit) - for _, assigned := range assignment.AssignedPartitions { + for _, assigned := range assignment.PartitionAssignments { wg.Add(1) semaphore <- struct{}{} - go func(partition *mq_pb.Partition, broker string) { + go func(assigned *mq_pb.BrokerPartitionAssignment) { defer wg.Done() defer func() { <-semaphore }() - glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) - err := sub.onEachPartition(partition, broker) + glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + err := sub.onEachPartition(assigned) if err != nil { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err) + glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) } - }(assigned.Partition, assigned.Broker) + }(assigned) } wg.Wait() } -func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error { +func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { // connect to the partition broker - return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ @@ -118,11 +118,12 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: sub.ContentConfig.Topic.ToPbTopic(), PartitionOffset: &mq_pb.PartitionOffset{ - Partition: partition, + Partition: assigned.Partition, StartTsNs: sub.alreadyProcessedTsNs, StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, }, Filter: sub.ContentConfig.Filter, + FollowerBrokers: assigned.FollowerBrokers, }, }, }) @@ -131,7 +132,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s return fmt.Errorf("create subscribe client: %v", err) } - glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) + glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) if sub.OnCompletionFunc != nil { defer sub.OnCompletionFunc() diff --git a/weed/mq/pub_balancer/balance_brokers_test.go b/weed/mq/pub_balancer/balance_brokers_test.go index 54667d154..122984f0d 100644 --- a/weed/mq/pub_balancer/balance_brokers_test.go +++ b/weed/mq/pub_balancer/balance_brokers_test.go @@ -21,8 +21,6 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) { Topic: topic.Topic{Namespace: "topic1", Name: "topic1"}, Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024}, }, - ConsumerCount: 1, - IsLeader: true, }) broker2Stats := &BrokerStats{ TopicPartitionCount: 2, @@ -35,16 +33,12 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) { Topic: topic.Topic{Namespace: "topic1", Name: "topic1"}, Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024}, }, - ConsumerCount: 1, - IsLeader: true, }) broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ Topic: topic.Topic{Namespace: "topic2", Name: "topic2"}, Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024}, }, - ConsumerCount: 1, - IsLeader: true, }) brokers.Set("broker1", broker1Stats) brokers.Set("broker2", broker2Stats) diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index b4bb28e42..00f1f80ca 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -9,15 +9,16 @@ import ( type BrokerStats struct { TopicPartitionCount int32 - ConsumerCount int32 + PublisherCount int32 + SubscriberCount int32 CpuUsagePercent int32 TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition Topics []topic.Topic } type TopicPartitionStats struct { topic.TopicPartition - ConsumerCount int32 - IsLeader bool + PublisherCount int32 + SubscriberCount int32 } func NewBrokerStats() *BrokerStats { @@ -26,15 +27,15 @@ func NewBrokerStats() *BrokerStats { } } func (bs *BrokerStats) String() string { - return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}", - bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items()) + return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}", + bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items()) } func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { bs.TopicPartitionCount = int32(len(stats.Stats)) bs.CpuUsagePercent = stats.CpuUsagePercent - var consumerCount int32 + var publisherCount, subscriberCount int32 currentTopicPartitions := bs.TopicPartitionStats.Items() for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ @@ -47,10 +48,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs, }, }, - ConsumerCount: topicPartitionStats.ConsumerCount, - IsLeader: topicPartitionStats.IsLeader, + PublisherCount: topicPartitionStats.PublisherCount, + SubscriberCount: topicPartitionStats.SubscriberCount, } - consumerCount += topicPartitionStats.ConsumerCount + publisherCount += topicPartitionStats.PublisherCount + subscriberCount += topicPartitionStats.SubscriberCount key := tps.TopicPartition.String() bs.TopicPartitionStats.Set(key, tps) delete(currentTopicPartitions, key) @@ -59,8 +61,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { for key := range currentTopicPartitions { bs.TopicPartitionStats.Remove(key) } - bs.ConsumerCount = consumerCount - + bs.PublisherCount = publisherCount + bs.SubscriberCount = subscriberCount } func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) { @@ -74,8 +76,8 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti UnixTimeNs: partition.UnixTimeNs, }, }, - ConsumerCount: 0, - IsLeader: true, + PublisherCount: 0, + SubscriberCount: 0, } key := tps.TopicPartition.String() if isAdd { diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go index 0ab1a5ea9..0f307c9eb 100644 --- a/weed/mq/pub_balancer/repair.go +++ b/weed/mq/pub_balancer/repair.go @@ -14,8 +14,7 @@ func (balancer *Balancer) RepairTopics() []BalanceAction { } type TopicPartitionInfo struct { - Leader string - Followers []string + Broker string } // RepairMissingTopicPartitions check the stats of all brokers, @@ -38,11 +37,7 @@ func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStat tpi = &TopicPartitionInfo{} topicPartitionToInfo[topicPartitionStat.Partition] = tpi } - if topicPartitionStat.IsLeader { - tpi.Leader = broker - } else { - tpi.Followers = append(tpi.Followers, broker) - } + tpi.Broker = broker } } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index f897fe2b3..d24a38d8a 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -103,22 +103,22 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr partitionSlots = make([]*PartitionSlotToConsumerInstance, 0) } consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots) - assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots)) + assignedPartitions := make([]*mq_pb.BrokerPartitionAssignment, len(partitionSlots)) for i, partitionSlot := range partitionSlots { - assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ + assignedPartitions[i] = &mq_pb.BrokerPartitionAssignment{ Partition: &mq_pb.Partition{ RangeStop: partitionSlot.RangeStop, RangeStart: partitionSlot.RangeStart, RingSize: partitionSlotToBrokerList.RingSize, UnixTimeNs: partitionSlot.UnixTimeNs, }, - Broker: partitionSlot.Broker, + LeaderBroker: partitionSlot.Broker, } } response := &mq_pb.SubscriberToSubCoordinatorResponse{ Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{ - AssignedPartitions: assignedPartitions, + PartitionAssignments: assignedPartitions, }, }, } diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index aa2eefcdc..79a84561c 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -19,8 +19,8 @@ func NewLocalTopicManager() *LocalTopicManager { } } -// AddTopicPartition adds a topic to the local topic manager -func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) { +// AddLocalPartition adds a topic to the local topic manager +func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { localTopic = NewLocalTopic(topic) @@ -34,8 +34,8 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition localTopic.Partitions = append(localTopic.Partitions, localPartition) } -// GetTopicPartition gets a topic from the local topic manager -func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition { +// GetLocalPartition gets a topic from the local topic manager +func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return nil @@ -48,7 +48,7 @@ func (manager *LocalTopicManager) RemoveTopic(topic Topic) { manager.topics.Remove(topic.String()) } -func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) { +func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return false @@ -96,8 +96,9 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br Namespace: string(localTopic.Namespace), Name: localTopic.Name, }, - Partition: localPartition.Partition.ToPbPartition(), - ConsumerCount: localPartition.ConsumerCount, + Partition: localPartition.Partition.ToPbPartition(), + PublisherCount: int32(localPartition.Publishers.Size()), + SubscriberCount: int32(localPartition.Subscribers.Size()), } // fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition) } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 798949736..54c122a0f 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -1,42 +1,74 @@ package topic import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sync" "sync/atomic" "time" ) type LocalPartition struct { + ListenersWaits int64 + AckTsNs int64 + + // notifying clients + ListenersLock sync.Mutex + ListenersCond *sync.Cond + Partition - isLeader bool - FollowerBrokers []pb.ServerAddress LogBuffer *log_buffer.LogBuffer - ConsumerCount int32 Publishers *LocalPartitionPublishers Subscribers *LocalPartitionSubscribers - FollowerId int32 + + followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient + followerGrpcConnection *grpc.ClientConn + follower string } var TIME_FORMAT = "2006-01-02-15-04-05" -func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { - return &LocalPartition{ +func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { + lp := &LocalPartition{ Partition: partition, - isLeader: isLeader, - FollowerBrokers: followerBrokers, - LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), - 2*time.Minute, logFlushFn, readFromDiskFn, func() {}), Publishers: NewLocalPartitionPublishers(), Subscribers: NewLocalPartitionSubscribers(), } + lp.ListenersCond = sync.NewCond(&lp.ListenersLock) + lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), + 2*time.Minute, logFlushFn, readFromDiskFn, func() { + if atomic.LoadInt64(&lp.ListenersWaits) > 0 { + lp.ListenersCond.Broadcast() + } + }) + return lp } -func (p *LocalPartition) Publish(message *mq_pb.DataMessage) { - p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) +func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { + p.LogBuffer.AddToBuffer(message) + + // maybe send to the follower + if p.followerStream != nil { + // println("recv", string(message.Key), message.TsNs) + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Data{ + Data: message, + }, + }); followErr != nil { + return fmt.Errorf("send to follower %s: %v", p.follower, followErr) + } + } else { + atomic.StoreInt64(&p.AckTsNs, message.TsNs) + } + + return nil } func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition, @@ -85,15 +117,6 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message return p.LogBuffer.GetEarliestPosition() } -func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { - isLeader := assignment.LeaderBroker == string(self) - followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers)) - for i, followerBroker := range assignment.FollowerBrokers { - followers[i] = pb.ServerAddress(followerBroker) - } - return NewLocalPartition(partition, isLeader, followers, logFlushFn, readFromDiskFn) -} - func (p *LocalPartition) closePublishers() { p.Publishers.SignalShutdown() } @@ -103,18 +126,93 @@ func (p *LocalPartition) closeSubscribers() { func (p *LocalPartition) WaitUntilNoPublishers() { for { - if p.Publishers.IsEmpty() { + if p.Publishers.Size() == 0 { return } time.Sleep(113 * time.Millisecond) } } +func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { + if p.followerStream != nil { + return nil + } + if len(initMessage.FollowerBrokers) == 0 { + return nil + } + + p.follower = initMessage.FollowerBrokers[0] + ctx := context.Background() + p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", p.follower, err) + } + followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection) + p.followerStream, err = followerClient.PublishFollowMe(ctx) + if err != nil { + return fmt.Errorf("fail to create publish client: %v", err) + } + if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Init{ + Init: &mq_pb.PublishFollowMeRequest_InitMessage{ + Topic: initMessage.Topic, + Partition: initMessage.Partition, + }, + }, + }); err != nil { + return err + } + + // start receiving ack from follower + go func() { + defer func() { + // println("stop receiving ack from follower") + }() + + for { + ack, err := p.followerStream.Recv() + if err != nil { + e, _ := status.FromError(err) + if e.Code() == codes.Canceled { + glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.follower) + return + } + glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.follower, err) + return + } + atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) + // println("recv ack", ack.AckTsNs) + } + }() + return nil +} + func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { - if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { + + if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 { p.LogBuffer.ShutdownLogBuffer() + for !p.LogBuffer.IsAllFlushed() { + time.Sleep(113 * time.Millisecond) + } + if p.followerStream != nil { + // send close to the follower + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + glog.Errorf("Error closing follower stream: %v", followErr) + } + glog.V(4).Infof("closing grpcConnection to follower") + p.followerGrpcConnection.Close() + p.followerStream = nil + p.follower = "" + } + hasShutdown = true } + + glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.follower, hasShutdown) return } @@ -122,5 +220,20 @@ func (p *LocalPartition) Shutdown() { p.closePublishers() p.closeSubscribers() p.LogBuffer.ShutdownLogBuffer() - atomic.StoreInt32(&p.FollowerId, 0) + glog.V(0).Infof("local partition %v shutting down", p.Partition) +} + +func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { + if p.followerStream != nil { + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Flush{ + Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ + TsNs: flushTsNs, + }, + }, + }); followErr != nil { + glog.Errorf("send follower %s flush message: %v", p.follower, followErr) + } + // println("notifying", p.follower, "flushed at", flushTsNs) + } } diff --git a/weed/mq/topic/local_partition_publishers.go b/weed/mq/topic/local_partition_publishers.go index c12f66336..e3c4e3ca6 100644 --- a/weed/mq/topic/local_partition_publishers.go +++ b/weed/mq/topic/local_partition_publishers.go @@ -44,13 +44,6 @@ func (p *LocalPartitionPublishers) SignalShutdown() { } } -func (p *LocalPartitionPublishers) IsEmpty() bool { - p.publishersLock.RLock() - defer p.publishersLock.RUnlock() - - return len(p.publishers) == 0 -} - func (p *LocalPartitionPublishers) Size() int { p.publishersLock.RLock() defer p.publishersLock.RUnlock() diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go index d3b989d72..24341ce7e 100644 --- a/weed/mq/topic/local_partition_subscribers.go +++ b/weed/mq/topic/local_partition_subscribers.go @@ -48,13 +48,6 @@ func (p *LocalPartitionSubscribers) SignalShutdown() { } } -func (p *LocalPartitionSubscribers) IsEmpty() bool { - p.SubscribersLock.RLock() - defer p.SubscribersLock.RUnlock() - - return len(p.Subscribers) == 0 -} - func (p *LocalPartitionSubscribers) Size() int { p.SubscribersLock.RLock() defer p.SubscribersLock.RUnlock() |
