aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-12 08:48:00 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-12 08:48:00 -0700
commit2f243f5b0b7c037eb13ae14f58bd1f8608fbc4d0 (patch)
tree47ab19b42e09549a37ab7e59754d1388f1d0acf8
parent4b7fa31468eb1b8e0af53543a7e760c517faf227 (diff)
downloadseaweedfs-2f243f5b0b7c037eb13ae14f58bd1f8608fbc4d0.tar.xz
seaweedfs-2f243f5b0b7c037eb13ae14f58bd1f8608fbc4d0.zip
refactor
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go4
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go4
-rw-r--r--weed/messaging/broker/broker_server.go6
-rw-r--r--weed/messaging/broker/topic_manager.go (renamed from weed/messaging/broker/topic_lock.go)52
4 files changed, 33 insertions, 33 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index 573706c06..dc11061af 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -57,8 +57,8 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
return fmt.Errorf("channel is already closed")
}
- tl := broker.topicLocks.RequestLock(tp, topicConfig, true)
- defer broker.topicLocks.ReleaseLock(tp, true)
+ tl := broker.topicManager.RequestLock(tp, topicConfig, true)
+ defer broker.topicManager.ReleaseLock(tp, true)
md5hash := md5.New()
// process each message
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 76cbdef24..e7cbb6441 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -47,8 +47,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
Topic: in.Init.Topic,
Partition: in.Init.Partition,
}
- lock := broker.topicLocks.RequestLock(tp, topicConfig, false)
- defer broker.topicLocks.ReleaseLock(tp, false)
+ lock := broker.topicManager.RequestLock(tp, topicConfig, false)
+ defer broker.topicManager.ReleaseLock(tp, false)
lastReadTime := time.Now()
switch in.Init.StartPosition {
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index e6ff2cf00..0c04d2841 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -24,7 +24,7 @@ type MessageBrokerOption struct {
type MessageBroker struct {
option *MessageBrokerOption
grpcDialOption grpc.DialOption
- topicLocks *TopicLocks
+ topicManager *TopicManager
}
func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
@@ -34,7 +34,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
grpcDialOption: grpcDialOption,
}
- messageBroker.topicLocks = NewTopicLocks(messageBroker)
+ messageBroker.topicManager = NewTopicManager(messageBroker)
messageBroker.checkFilers()
@@ -58,7 +58,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
Name: broker.option.Ip,
GrpcPort: uint32(broker.option.Port),
}
- for _, tp := range broker.topicLocks.ListTopicPartitions() {
+ for _, tp := range broker.topicManager.ListTopicPartitions() {
initRequest.Resources = append(initRequest.Resources, tp.String())
}
if err := stream.Send(&filer_pb.KeepConnectedRequest{
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_manager.go
index 9ae446df3..21594dea5 100644
--- a/weed/messaging/broker/topic_lock.go
+++ b/weed/messaging/broker/topic_manager.go
@@ -25,7 +25,7 @@ func (tp *TopicPartition) String() string {
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
}
-type TopicLock struct {
+type TopicControl struct {
sync.Mutex
cond *sync.Cond
subscriberCount int
@@ -33,20 +33,20 @@ type TopicLock struct {
logBuffer *log_buffer.LogBuffer
}
-type TopicLocks struct {
+type TopicManager struct {
sync.Mutex
- locks map[TopicPartition]*TopicLock
- broker *MessageBroker
+ topicControls map[TopicPartition]*TopicControl
+ broker *MessageBroker
}
-func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks {
- return &TopicLocks{
- locks: make(map[TopicPartition]*TopicLock),
- broker: messageBroker,
+func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
+ return &TopicManager{
+ topicControls: make(map[TopicPartition]*TopicControl),
+ broker: messageBroker,
}
}
-func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
+func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
flushFn := func(startTime, stopTime time.Time, buf []byte) {
@@ -63,7 +63,7 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC
tp.Partition,
)
- if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
+ if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
}
}
@@ -74,16 +74,16 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC
return logBuffer
}
-func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock {
- tl.Lock()
- defer tl.Unlock()
+func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
+ tm.Lock()
+ defer tm.Unlock()
- lock, found := tl.locks[partition]
+ lock, found := tm.topicControls[partition]
if !found {
- lock = &TopicLock{}
+ lock = &TopicControl{}
lock.cond = sync.NewCond(&lock.Mutex)
- tl.locks[partition] = lock
- lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig)
+ tm.topicControls[partition] = lock
+ lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig)
}
if isPublisher {
lock.publisherCount++
@@ -93,11 +93,11 @@ func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messagi
return lock
}
-func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
- tl.Lock()
- defer tl.Unlock()
+func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
+ tm.Lock()
+ defer tm.Unlock()
- lock, found := tl.locks[partition]
+ lock, found := tm.topicControls[partition]
if !found {
return
}
@@ -107,16 +107,16 @@ func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
lock.subscriberCount--
}
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
- delete(tl.locks, partition)
+ delete(tm.topicControls, partition)
lock.logBuffer.Shutdown()
}
}
-func (tl *TopicLocks) ListTopicPartitions() (tps []TopicPartition) {
- tl.Lock()
- defer tl.Unlock()
+func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
+ tm.Lock()
+ defer tm.Unlock()
- for k := range tl.locks {
+ for k := range tm.topicControls {
tps = append(tps, k)
}
return