aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-18 01:12:01 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-18 01:12:01 -0700
commit11f5a6d91346e5f3cbf3b46e0a660e231c5c2998 (patch)
tree9cf930e9d832023c0a00345f9f95f62297ea5441
parent5bea77010f2b290398c46a517d40fa7ad559dfed (diff)
downloadseaweedfs-11f5a6d91346e5f3cbf3b46e0a660e231c5c2998.tar.xz
seaweedfs-11f5a6d91346e5f3cbf3b46e0a660e231c5c2998.zip
messaging can compile now
-rw-r--r--weed/command/msg_broker.go4
-rw-r--r--weed/messaging/broker/broker_append.go (renamed from weed/messaging/broker_append.go)2
-rw-r--r--weed/messaging/broker/broker_grpc_server.go (renamed from weed/messaging/broker_grpc_server.go)2
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go (renamed from weed/messaging/broker_grpc_server_publish.go)51
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go88
-rw-r--r--weed/messaging/broker/broker_server.go (renamed from weed/messaging/broker_server.go)4
-rw-r--r--weed/messaging/broker/topic_lock.go80
-rw-r--r--weed/messaging/broker_grpc_server_subscribe.go9
8 files changed, 205 insertions, 35 deletions
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 36d164800..67ebdfb6d 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -9,7 +9,7 @@ import (
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/messaging"
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
@@ -80,7 +80,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
}
}
- qs, err := messaging.NewMessageBroker(&messaging.MessageBrokerOption{
+ qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
Filers: []string{*msgBrokerOpt.filer},
DefaultReplication: "",
MaxMB: 0,
diff --git a/weed/messaging/broker_append.go b/weed/messaging/broker/broker_append.go
index a0c0e8614..7194dfcfc 100644
--- a/weed/messaging/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -1,4 +1,4 @@
-package messaging
+package broker
import (
"context"
diff --git a/weed/messaging/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go
index 0d1eb72ac..447620a6b 100644
--- a/weed/messaging/broker_grpc_server.go
+++ b/weed/messaging/broker/broker_grpc_server.go
@@ -1,4 +1,4 @@
-package messaging
+package broker
import (
"context"
diff --git a/weed/messaging/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index db3bf0764..20e6eb04b 100644
--- a/weed/messaging/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -1,4 +1,4 @@
-package messaging
+package broker
import (
"fmt"
@@ -10,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
)
func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
@@ -23,15 +22,36 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
if err != nil {
return err
}
- namespace, topic, partition := in.Init.Namespace, in.Init.Topic, in.Init.Partition
-
- updatesChan := make(chan int32)
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
}
+ // get lock
+ tp := TopicPartition{
+ Namespace: in.Init.Namespace,
+ Topic: in.Init.Topic,
+ Partition: in.Init.Partition,
+ }
+ logBuffer := broker.topicLocks.RequestPublisherLock(tp, func(startTime, stopTime time.Time, buf []byte) {
+
+ targetFile := fmt.Sprintf(
+ "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
+ filer2.TopicsDir, tp.Namespace, tp.Topic,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ tp.Partition,
+ )
+
+ if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
+
+ })
+ defer broker.topicLocks.ReleaseLock(tp, true)
+
+ updatesChan := make(chan int32)
+
go func() {
for update := range updatesChan {
if err := stream.Send(&messaging_pb.PublishResponse{
@@ -45,23 +65,8 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
}
}()
- logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
-
- targetFile := fmt.Sprintf(
- "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
- filer2.TopicsDir, namespace, topic,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- partition,
- )
-
- if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
-
- }, func() {
- // notify subscribers
- })
+ // process each message
for {
in, err := stream.Recv()
if err == io.EOF {
@@ -71,6 +76,10 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
return err
}
+ if in.Data == nil {
+ continue
+ }
+
m := &messaging_pb.Message{
Timestamp: time.Now().UnixNano(),
Key: in.Data.Key,
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
new file mode 100644
index 000000000..5a3c4f785
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -0,0 +1,88 @@
+package broker
+
+import (
+ "io"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
+
+ // process initial request
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ subscriberId := in.Init.SubscriberId
+
+ // get lock
+ tp := TopicPartition{
+ Namespace: in.Init.Namespace,
+ Topic: in.Init.Topic,
+ Partition: in.Init.Partition,
+ }
+ lock := broker.topicLocks.RequestSubscriberLock(tp)
+ defer broker.topicLocks.ReleaseLock(tp, false)
+ cond := sync.NewCond(&lock.Mutex)
+
+ lastReadTime := time.Now()
+ switch in.Init.StartPosition {
+ case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
+ lastReadTime = time.Unix(0, in.Init.TimestampNs)
+ case messaging_pb.SubscriberMessage_InitMessage_LATEST:
+ case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
+ }
+
+ // how to process each message
+ // an error returned will end the subscription
+ eachMessageFn := func(m *messaging_pb.Message) error {
+ err := stream.Send(&messaging_pb.BrokerMessage{
+ Data: m,
+ })
+ if err != nil {
+ glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
+ }
+ return err
+ }
+
+ // loop through all messages
+ for {
+
+ _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime)
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ m := &messaging_pb.Message{}
+ if err = proto.Unmarshal(entryData, m); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+
+ if err = eachMessageFn(m); err != nil {
+ return err
+ }
+
+ lastReadTime = time.Unix(0, m.Timestamp)
+ pos += 4 + int(size)
+ }
+
+ lock.Mutex.Lock()
+ cond.Wait()
+ lock.Mutex.Unlock()
+ }
+
+}
diff --git a/weed/messaging/broker_server.go b/weed/messaging/broker/broker_server.go
index bc842eeea..0522eb4b7 100644
--- a/weed/messaging/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -1,4 +1,4 @@
-package messaging
+package broker
import (
"context"
@@ -23,6 +23,7 @@ type MessageBrokerOption struct {
type MessageBroker struct {
option *MessageBrokerOption
grpcDialOption grpc.DialOption
+ topicLocks *TopicLocks
}
func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
@@ -30,6 +31,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
messageBroker = &MessageBroker{
option: option,
grpcDialOption: grpcDialOption,
+ topicLocks: NewTopicLocks(),
}
go messageBroker.loopForEver()
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go
new file mode 100644
index 000000000..9e4ea6824
--- /dev/null
+++ b/weed/messaging/broker/topic_lock.go
@@ -0,0 +1,80 @@
+package broker
+
+import (
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+type TopicPartition struct {
+ Namespace string
+ Topic string
+ Partition int32
+}
+type TopicLock struct {
+ sync.Mutex
+ subscriberCount int
+ publisherCount int
+ logBuffer *log_buffer.LogBuffer
+}
+
+type TopicLocks struct {
+ sync.Mutex
+ locks map[TopicPartition]*TopicLock
+}
+
+func NewTopicLocks() *TopicLocks {
+ return &TopicLocks{
+ locks: make(map[TopicPartition]*TopicLock),
+ }
+}
+
+func (tl *TopicLocks) RequestSubscriberLock(partition TopicPartition) *TopicLock {
+ tl.Lock()
+ defer tl.Unlock()
+
+ lock, found := tl.locks[partition]
+ if !found {
+ lock = &TopicLock{}
+ tl.locks[partition] = lock
+ }
+ lock.subscriberCount++
+
+ return lock
+}
+
+func (tl *TopicLocks) RequestPublisherLock(partition TopicPartition, flushFn func(startTime, stopTime time.Time, buf []byte)) *log_buffer.LogBuffer {
+ tl.Lock()
+ defer tl.Unlock()
+
+ lock, found := tl.locks[partition]
+ if !found {
+ lock = &TopicLock{}
+ tl.locks[partition] = lock
+ }
+ lock.publisherCount++
+ cond := sync.NewCond(&lock.Mutex)
+ lock.logBuffer = log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
+ cond.Broadcast()
+ })
+ return lock.logBuffer
+}
+
+func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
+ tl.Lock()
+ defer tl.Unlock()
+
+ lock, found := tl.locks[partition]
+ if !found {
+ return
+ }
+ if isPublisher {
+ lock.publisherCount--
+ } else {
+ lock.subscriberCount--
+ }
+ if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
+ delete(tl.locks, partition)
+ }
+}
diff --git a/weed/messaging/broker_grpc_server_subscribe.go b/weed/messaging/broker_grpc_server_subscribe.go
deleted file mode 100644
index 137fcac8a..000000000
--- a/weed/messaging/broker_grpc_server_subscribe.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package messaging
-
-import (
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) Subscribe(server messaging_pb.SeaweedMessaging_SubscribeServer) error {
- panic("implement me")
-}