diff options
| -rw-r--r-- | weed/command/msg_broker.go | 4 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_append.go (renamed from weed/messaging/broker_append.go) | 2 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server.go (renamed from weed/messaging/broker_grpc_server.go) | 2 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go (renamed from weed/messaging/broker_grpc_server_publish.go) | 51 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 88 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go (renamed from weed/messaging/broker_server.go) | 4 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_lock.go | 80 | ||||
| -rw-r--r-- | weed/messaging/broker_grpc_server_subscribe.go | 9 |
8 files changed, 205 insertions, 35 deletions
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 36d164800..67ebdfb6d 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -9,7 +9,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/messaging" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" @@ -80,7 +80,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { } } - qs, err := messaging.NewMessageBroker(&messaging.MessageBrokerOption{ + qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ Filers: []string{*msgBrokerOpt.filer}, DefaultReplication: "", MaxMB: 0, diff --git a/weed/messaging/broker_append.go b/weed/messaging/broker/broker_append.go index a0c0e8614..7194dfcfc 100644 --- a/weed/messaging/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -1,4 +1,4 @@ -package messaging +package broker import ( "context" diff --git a/weed/messaging/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go index 0d1eb72ac..447620a6b 100644 --- a/weed/messaging/broker_grpc_server.go +++ b/weed/messaging/broker/broker_grpc_server.go @@ -1,4 +1,4 @@ -package messaging +package broker import ( "context" diff --git a/weed/messaging/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index db3bf0764..20e6eb04b 100644 --- a/weed/messaging/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -1,4 +1,4 @@ -package messaging +package broker import ( "fmt" @@ -10,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" ) func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error { @@ -23,15 +22,36 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis if err != nil { return err } - namespace, topic, partition := in.Init.Namespace, in.Init.Topic, in.Init.Partition - - updatesChan := make(chan int32) // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ } + // get lock + tp := TopicPartition{ + Namespace: in.Init.Namespace, + Topic: in.Init.Topic, + Partition: in.Init.Partition, + } + logBuffer := broker.topicLocks.RequestPublisherLock(tp, func(startTime, stopTime time.Time, buf []byte) { + + targetFile := fmt.Sprintf( + "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", + filer2.TopicsDir, tp.Namespace, tp.Topic, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + tp.Partition, + ) + + if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil { + glog.V(0).Infof("log write failed %s: %v", targetFile, err) + } + + }) + defer broker.topicLocks.ReleaseLock(tp, true) + + updatesChan := make(chan int32) + go func() { for update := range updatesChan { if err := stream.Send(&messaging_pb.PublishResponse{ @@ -45,23 +65,8 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis } }() - logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) { - - targetFile := fmt.Sprintf( - "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", - filer2.TopicsDir, namespace, topic, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - partition, - ) - - if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil { - glog.V(0).Infof("log write failed %s: %v", targetFile, err) - } - - }, func() { - // notify subscribers - }) + // process each message for { in, err := stream.Recv() if err == io.EOF { @@ -71,6 +76,10 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis return err } + if in.Data == nil { + continue + } + m := &messaging_pb.Message{ Timestamp: time.Now().UnixNano(), Key: in.Data.Key, diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go new file mode 100644 index 000000000..5a3c4f785 --- /dev/null +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -0,0 +1,88 @@ +package broker + +import ( + "io" + "sync" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error { + + // process initial request + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + subscriberId := in.Init.SubscriberId + + // get lock + tp := TopicPartition{ + Namespace: in.Init.Namespace, + Topic: in.Init.Topic, + Partition: in.Init.Partition, + } + lock := broker.topicLocks.RequestSubscriberLock(tp) + defer broker.topicLocks.ReleaseLock(tp, false) + cond := sync.NewCond(&lock.Mutex) + + lastReadTime := time.Now() + switch in.Init.StartPosition { + case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP: + lastReadTime = time.Unix(0, in.Init.TimestampNs) + case messaging_pb.SubscriberMessage_InitMessage_LATEST: + case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: + } + + // how to process each message + // an error returned will end the subscription + eachMessageFn := func(m *messaging_pb.Message) error { + err := stream.Send(&messaging_pb.BrokerMessage{ + Data: m, + }) + if err != nil { + glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) + } + return err + } + + // loop through all messages + for { + + _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime) + + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + + m := &messaging_pb.Message{} + if err = proto.Unmarshal(entryData, m); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + pos += 4 + int(size) + continue + } + + if err = eachMessageFn(m); err != nil { + return err + } + + lastReadTime = time.Unix(0, m.Timestamp) + pos += 4 + int(size) + } + + lock.Mutex.Lock() + cond.Wait() + lock.Mutex.Unlock() + } + +} diff --git a/weed/messaging/broker_server.go b/weed/messaging/broker/broker_server.go index bc842eeea..0522eb4b7 100644 --- a/weed/messaging/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -1,4 +1,4 @@ -package messaging +package broker import ( "context" @@ -23,6 +23,7 @@ type MessageBrokerOption struct { type MessageBroker struct { option *MessageBrokerOption grpcDialOption grpc.DialOption + topicLocks *TopicLocks } func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { @@ -30,6 +31,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio messageBroker = &MessageBroker{ option: option, grpcDialOption: grpcDialOption, + topicLocks: NewTopicLocks(), } go messageBroker.loopForEver() diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go new file mode 100644 index 000000000..9e4ea6824 --- /dev/null +++ b/weed/messaging/broker/topic_lock.go @@ -0,0 +1,80 @@ +package broker + +import ( + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +type TopicPartition struct { + Namespace string + Topic string + Partition int32 +} +type TopicLock struct { + sync.Mutex + subscriberCount int + publisherCount int + logBuffer *log_buffer.LogBuffer +} + +type TopicLocks struct { + sync.Mutex + locks map[TopicPartition]*TopicLock +} + +func NewTopicLocks() *TopicLocks { + return &TopicLocks{ + locks: make(map[TopicPartition]*TopicLock), + } +} + +func (tl *TopicLocks) RequestSubscriberLock(partition TopicPartition) *TopicLock { + tl.Lock() + defer tl.Unlock() + + lock, found := tl.locks[partition] + if !found { + lock = &TopicLock{} + tl.locks[partition] = lock + } + lock.subscriberCount++ + + return lock +} + +func (tl *TopicLocks) RequestPublisherLock(partition TopicPartition, flushFn func(startTime, stopTime time.Time, buf []byte)) *log_buffer.LogBuffer { + tl.Lock() + defer tl.Unlock() + + lock, found := tl.locks[partition] + if !found { + lock = &TopicLock{} + tl.locks[partition] = lock + } + lock.publisherCount++ + cond := sync.NewCond(&lock.Mutex) + lock.logBuffer = log_buffer.NewLogBuffer(time.Minute, flushFn, func() { + cond.Broadcast() + }) + return lock.logBuffer +} + +func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) { + tl.Lock() + defer tl.Unlock() + + lock, found := tl.locks[partition] + if !found { + return + } + if isPublisher { + lock.publisherCount-- + } else { + lock.subscriberCount-- + } + if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { + delete(tl.locks, partition) + } +} diff --git a/weed/messaging/broker_grpc_server_subscribe.go b/weed/messaging/broker_grpc_server_subscribe.go deleted file mode 100644 index 137fcac8a..000000000 --- a/weed/messaging/broker_grpc_server_subscribe.go +++ /dev/null @@ -1,9 +0,0 @@ -package messaging - -import ( - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) Subscribe(server messaging_pb.SeaweedMessaging_SubscribeServer) error { - panic("implement me") -} |
