aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go5
-rw-r--r--weed/messaging/broker/subscription.go82
-rw-r--r--weed/messaging/broker/topic_manager.go25
3 files changed, 15 insertions, 97 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index eb6946e81..e7cbb6441 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -48,7 +48,6 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
Partition: in.Init.Partition,
}
lock := broker.topicManager.RequestLock(tp, topicConfig, false)
- subscription := lock.subscriptions.AddSubscription(subscriberId)
defer broker.topicManager.ReleaseLock(tp, false)
lastReadTime := time.Now()
@@ -103,7 +102,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
}
err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
- subscription.Wait()
+ lock.Mutex.Lock()
+ lock.cond.Wait()
+ lock.Mutex.Unlock()
return true
}, eachLogEntryFn)
diff --git a/weed/messaging/broker/subscription.go b/weed/messaging/broker/subscription.go
deleted file mode 100644
index f74b0c546..000000000
--- a/weed/messaging/broker/subscription.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package broker
-
-import (
- "sync"
-)
-
-type TopicPartitionSubscription struct {
- sync.Mutex
- name string
- lastReadTsNs int64
- cond *sync.Cond
-}
-
-func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription {
- t := &TopicPartitionSubscription{
- name: name,
- }
- t.cond = sync.NewCond(t)
- return t
-}
-
-func (s *TopicPartitionSubscription) Wait() {
- s.Mutex.Lock()
- s.cond.Wait()
- s.Mutex.Unlock()
-}
-
-func (s *TopicPartitionSubscription) NotifyOne() {
- // notify one waiting goroutine
- s.cond.Signal()
-}
-
-type TopicPartitionSubscriptions struct {
- sync.Mutex
- cond *sync.Cond
- subscriptions map[string]*TopicPartitionSubscription
- subscriptionsLock sync.RWMutex
-}
-
-func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions {
- m := &TopicPartitionSubscriptions{
- subscriptions: make(map[string]*TopicPartitionSubscription),
- }
- m.cond = sync.NewCond(m)
- return m
-}
-
-func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription {
- m.subscriptionsLock.Lock()
- defer m.subscriptionsLock.Unlock()
-
- if s, found := m.subscriptions[subscription]; found {
- return s
- }
-
- s := NewTopicPartitionSubscription(subscription)
- m.subscriptions[subscription] = s
-
- return s
-
-}
-
-func (m *TopicPartitionSubscriptions) NotifyAll() {
-
- m.subscriptionsLock.RLock()
- defer m.subscriptionsLock.RUnlock()
-
- for name, tps := range m.subscriptions {
- println("notifying", name)
- tps.NotifyOne()
- }
-
-}
-
-func (m *TopicPartitionSubscriptions) Wait() {
- m.Mutex.Lock()
- m.cond.Wait()
- for _, tps := range m.subscriptions {
- tps.NotifyOne()
- }
- m.Mutex.Unlock()
-}
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
index e9f8903e8..b563fffa1 100644
--- a/weed/messaging/broker/topic_manager.go
+++ b/weed/messaging/broker/topic_manager.go
@@ -31,23 +31,22 @@ type TopicControl struct {
subscriberCount int
publisherCount int
logBuffer *log_buffer.LogBuffer
- subscriptions *TopicPartitionSubscriptions
}
type TopicManager struct {
sync.Mutex
- topicCursors map[TopicPartition]*TopicControl
- broker *MessageBroker
+ topicControls map[TopicPartition]*TopicControl
+ broker *MessageBroker
}
func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
return &TopicManager{
- topicCursors: make(map[TopicPartition]*TopicControl),
- broker: messageBroker,
+ topicControls: make(map[TopicPartition]*TopicControl),
+ broker: messageBroker,
}
}
-func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
+func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
flushFn := func(startTime, stopTime time.Time, buf []byte) {
@@ -69,7 +68,7 @@ func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topi
}
}
logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
- tc.subscriptions.NotifyAll()
+ tl.cond.Broadcast()
})
return logBuffer
@@ -79,11 +78,11 @@ func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messa
tm.Lock()
defer tm.Unlock()
- tc, found := tm.topicCursors[partition]
+ tc, found := tm.topicControls[partition]
if !found {
tc = &TopicControl{}
- tm.topicCursors[partition] = tc
- tc.subscriptions = NewTopicPartitionSubscriptions()
+ tc.cond = sync.NewCond(&tc.Mutex)
+ tm.topicControls[partition] = tc
tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
}
if isPublisher {
@@ -98,7 +97,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool)
tm.Lock()
defer tm.Unlock()
- lock, found := tm.topicCursors[partition]
+ lock, found := tm.topicControls[partition]
if !found {
return
}
@@ -108,7 +107,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool)
lock.subscriberCount--
}
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
- delete(tm.topicCursors, partition)
+ delete(tm.topicControls, partition)
lock.logBuffer.Shutdown()
}
}
@@ -117,7 +116,7 @@ func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
tm.Lock()
defer tm.Unlock()
- for k := range tm.topicCursors {
+ for k := range tm.topicControls {
tps = append(tps, k)
}
return