aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-12 22:55:55 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-12 22:55:55 -0700
commitca4017dd87ce23198e4589b7cf631ad6276ea564 (patch)
tree9fb32ce80f5ea267aeaedec652504155cd1174e1
parent25257acd51085c3be360f6f516e7019b79c1b813 (diff)
downloadseaweedfs-ca4017dd87ce23198e4589b7cf631ad6276ea564.tar.xz
seaweedfs-ca4017dd87ce23198e4589b7cf631ad6276ea564.zip
rename
-rw-r--r--weed/messaging/broker/topic_manager.go28
1 files changed, 14 insertions, 14 deletions
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
index 7cc8601eb..e9f8903e8 100644
--- a/weed/messaging/broker/topic_manager.go
+++ b/weed/messaging/broker/topic_manager.go
@@ -25,7 +25,7 @@ func (tp *TopicPartition) String() string {
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
}
-type TopicCursor struct {
+type TopicControl struct {
sync.Mutex
cond *sync.Cond
subscriberCount int
@@ -36,18 +36,18 @@ type TopicCursor struct {
type TopicManager struct {
sync.Mutex
- topicCursors map[TopicPartition]*TopicCursor
+ topicCursors map[TopicPartition]*TopicControl
broker *MessageBroker
}
func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
return &TopicManager{
- topicCursors: make(map[TopicPartition]*TopicCursor),
+ topicCursors: make(map[TopicPartition]*TopicControl),
broker: messageBroker,
}
}
-func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
+func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
flushFn := func(startTime, stopTime time.Time, buf []byte) {
@@ -69,29 +69,29 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topic
}
}
logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
- tl.subscriptions.NotifyAll()
+ tc.subscriptions.NotifyAll()
})
return logBuffer
}
-func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicCursor {
+func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
tm.Lock()
defer tm.Unlock()
- lock, found := tm.topicCursors[partition]
+ tc, found := tm.topicCursors[partition]
if !found {
- lock = &TopicCursor{}
- tm.topicCursors[partition] = lock
- lock.subscriptions = NewTopicPartitionSubscriptions()
- lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig)
+ tc = &TopicControl{}
+ tm.topicCursors[partition] = tc
+ tc.subscriptions = NewTopicPartitionSubscriptions()
+ tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
}
if isPublisher {
- lock.publisherCount++
+ tc.publisherCount++
} else {
- lock.subscriberCount++
+ tc.subscriberCount++
}
- return lock
+ return tc
}
func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {