aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go34
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go78
-rw-r--r--weed/mq/broker/broker_grpc_sub.go192
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go3
-rw-r--r--weed/mq/topic/local_partition.go18
-rw-r--r--weed/mq/topic/local_topic.go1
6 files changed, 293 insertions, 33 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 17d01f620..8c46ea99d 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
"io"
@@ -59,6 +60,21 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response)
}
ackInterval = int(initMessage.AckInterval)
+ for _, follower := range initMessage.FollowerBrokers {
+ followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{
+ Topic: initMessage.Topic,
+ Partition: initMessage.Partition,
+ BrokerSelf: string(b.option.BrokerAddress()),
+ })
+ return err
+ })
+ if followErr != nil {
+ response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr)
+ glog.Errorf("follower %v failed: %v", follower, followErr)
+ return stream.Send(response)
+ }
+ }
stream.Send(response)
} else {
response.Error = fmt.Sprintf("missing init message")
@@ -86,21 +102,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence)
}()
go func() {
- for {
- select {
- case resp := <-respChan:
- if resp != nil {
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending response %v: %v", resp, err)
- }
- } else {
- return
- }
- case <-localTopicPartition.StopPublishersCh:
- respChan <- &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- ShouldClose: true,
- }
+ for resp := range respChan {
+ if err := stream.Send(resp); err != nil {
+ glog.Errorf("Error sending response %v: %v", resp, err)
}
}
}()
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
new file mode 100644
index 000000000..e74d7025f
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -0,0 +1,78 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "io"
+ "math/rand"
+ "time"
+)
+
+func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){
+ glog.V(0).Infof("PublishFollowMe %v", request)
+ go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
+ followerId := rand.Int31()
+ subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
+ Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
+ Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
+ ConsumerGroup: string(b.option.BrokerAddress()),
+ ConsumerId: fmt.Sprintf("followMe-%d", followerId),
+ FollowerId: followerId,
+ Topic: request.Topic,
+ PartitionOffset: &mq_pb.PartitionOffset{
+ Partition: request.Partition,
+ StartTsNs: 0,
+ StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
+ },
+ },
+ },
+ })
+ if err != nil {
+ glog.Errorf("FollowInMemoryMessages error: %v", err)
+ return err
+ }
+
+ b.doFollowInMemoryMessage(context.Background(), subscribeClient)
+
+ return nil
+ })
+ return &mq_pb.PublishFollowMeResponse{}, nil
+}
+
+func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
+ for {
+ resp, err := client.Recv()
+ if err != nil {
+ if err != io.EOF {
+ glog.V(0).Infof("doFollowInMemoryMessage error: %v", err)
+ }
+ return
+ }
+ if resp == nil {
+ glog.V(0).Infof("doFollowInMemoryMessage nil response")
+ return
+ }
+ if resp.Message != nil {
+ // process ctrl message or data message
+ switch m:= resp.Message.(type) {
+ case *mq_pb.FollowInMemoryMessagesResponse_Data:
+ // process data message
+ print("d")
+ case *mq_pb.FollowInMemoryMessagesResponse_Ctrl:
+ // process ctrl message
+ if m.Ctrl.FlushedSequence > 0 {
+ flushTime := time.Unix(0, m.Ctrl.FlushedSequence)
+ glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime)
+ }
+ if m.Ctrl.FollowerChangedToId != 0 {
+ // follower changed
+ glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId)
+ return
+ }
+ }
+ }
+ }
+}
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index c6dde6f4e..3280be2c0 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "sync/atomic"
"time"
)
@@ -69,15 +70,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
- offset := req.GetInit().GetPartitionOffset()
- if offset.StartTsNs != 0 {
- startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
- }
- if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
- startPosition = log_buffer.NewMessagePosition(1, -3)
- } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
- startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
- }
+ startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
}
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
@@ -85,10 +78,10 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
return false
}
sleepIntervalCount++
- if sleepIntervalCount > 10 {
- sleepIntervalCount = 10
+ if sleepIntervalCount > 32 {
+ sleepIntervalCount = 32
}
- time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond)
+ time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
@@ -116,6 +109,179 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
TsNs: logEntry.TsNs,
},
}}); err != nil {
+ glog.Errorf("Error sending data: %v", err)
+ return false, err
+ }
+
+ counter++
+ return false, nil
+ })
+}
+
+func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
+ if offset.StartTsNs != 0 {
+ startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
+ }
+ if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
+ startPosition = log_buffer.NewMessagePosition(1, -3)
+ } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
+ startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
+ }
+ return
+}
+
+func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
+ ctx := stream.Context()
+ clientName := req.GetInit().ConsumerId
+
+ t := topic.FromPbTopic(req.GetInit().Topic)
+ partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
+
+ glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
+
+ waitIntervalCount := 0
+
+ var localTopicPartition *topic.LocalPartition
+ for localTopicPartition == nil {
+ localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
+ if err != nil {
+ glog.V(1).Infof("topic %v partition %v not setup", t, partition)
+ }
+ if localTopicPartition != nil {
+ break
+ }
+ waitIntervalCount++
+ if waitIntervalCount > 32 {
+ waitIntervalCount = 32
+ }
+ time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return nil
+ }
+ glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
+ return nil
+ default:
+ // Continue processing the request
+ }
+ }
+
+ // set the current follower id
+ followerId := req.GetInit().FollowerId
+ atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
+
+ glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
+ isConnected := true
+ sleepIntervalCount := 0
+
+ var counter int64
+ defer func() {
+ isConnected = false
+ glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
+ }()
+
+ var startPosition log_buffer.MessagePosition
+ if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
+ startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
+ }
+
+ var prevFlushTsNs int64
+
+ _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
+ if !isConnected {
+ return false
+ }
+ sleepIntervalCount++
+ if sleepIntervalCount > 32 {
+ sleepIntervalCount = 32
+ }
+ time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
+
+ if localTopicPartition.LogBuffer.IsStopping() {
+ newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
+ glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FollowerChangedToId: newFollowerId,
+ },
+ },
+ })
+ return false
+ }
+
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return false
+ }
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ return false
+ default:
+ // Continue processing the request
+ }
+
+ // send the last flushed sequence
+ flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
+ if flushTsNs != prevFlushTsNs {
+ prevFlushTsNs = flushTsNs
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FlushedSequence: flushTsNs,
+ },
+ },
+ })
+ }
+
+ return true
+ }, func(logEntry *filer_pb.LogEntry) (bool, error) {
+ // reset the sleep interval count
+ sleepIntervalCount = 0
+
+ // check the follower id
+ newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
+ if newFollowerId != followerId {
+ glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FollowerChangedToId: newFollowerId,
+ },
+ },
+ })
+ return true, nil
+ }
+
+ // send the last flushed sequence
+ flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
+ if flushTsNs != prevFlushTsNs {
+ prevFlushTsNs = flushTsNs
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FlushedSequence: flushTsNs,
+ },
+ },
+ })
+ }
+
+ // send the log entry
+ if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
+ Data: &mq_pb.DataMessage{
+ Key: logEntry.Key,
+ Value: logEntry.Data,
+ TsNs: logEntry.TsNs,
+ },
+ }}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
return false, err
}
@@ -123,4 +289,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
counter++
return false, nil
})
+
+ return err
}
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index 4ebb62000..a058d8da5 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto"
"math"
+ "sync/atomic"
"time"
)
@@ -38,6 +39,8 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
break
}
}
+
+ atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
}
}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 0947d259b..062f3f4bd 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "sync/atomic"
"time"
)
@@ -15,10 +16,9 @@ type LocalPartition struct {
FollowerBrokers []pb.ServerAddress
LogBuffer *log_buffer.LogBuffer
ConsumerCount int32
- StopPublishersCh chan struct{}
Publishers *LocalPartitionPublishers
- StopSubscribersCh chan struct{}
Subscribers *LocalPartitionSubscribers
+ FollowerId int32
}
var TIME_FORMAT = "2006-01-02-15-04-05"
@@ -58,6 +58,9 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
startPosition = processedPosition
processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
+ if isDone {
+ return nil
+ }
startPosition = processedPosition
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
@@ -67,9 +70,6 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
return readInMemoryLogErr
}
- if isDone {
- return nil
- }
}
}
@@ -96,7 +96,6 @@ func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition,
func (p *LocalPartition) closePublishers() {
p.Publishers.SignalShutdown()
- close(p.StopPublishersCh)
}
func (p *LocalPartition) closeSubscribers() {
p.Subscribers.SignalShutdown()
@@ -118,3 +117,10 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
}
return
}
+
+func (p *LocalPartition) Shutdown() {
+ p.closePublishers()
+ p.closeSubscribers()
+ p.LogBuffer.ShutdownLogBuffer()
+ atomic.StoreInt32(&p.FollowerId, 0)
+}
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go
index 7825d2168..8ab2a0db5 100644
--- a/weed/mq/topic/local_topic.go
+++ b/weed/mq/topic/local_topic.go
@@ -27,6 +27,7 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool {
for i, localPartition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition) {
foundPartitionIndex = i
+ localPartition.Shutdown()
break
}
}