aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go18
1 files changed, 12 insertions, 6 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 0947d259b..062f3f4bd 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "sync/atomic"
"time"
)
@@ -15,10 +16,9 @@ type LocalPartition struct {
FollowerBrokers []pb.ServerAddress
LogBuffer *log_buffer.LogBuffer
ConsumerCount int32
- StopPublishersCh chan struct{}
Publishers *LocalPartitionPublishers
- StopSubscribersCh chan struct{}
Subscribers *LocalPartitionSubscribers
+ FollowerId int32
}
var TIME_FORMAT = "2006-01-02-15-04-05"
@@ -58,6 +58,9 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
startPosition = processedPosition
processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
+ if isDone {
+ return nil
+ }
startPosition = processedPosition
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
@@ -67,9 +70,6 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
return readInMemoryLogErr
}
- if isDone {
- return nil
- }
}
}
@@ -96,7 +96,6 @@ func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition,
func (p *LocalPartition) closePublishers() {
p.Publishers.SignalShutdown()
- close(p.StopPublishersCh)
}
func (p *LocalPartition) closeSubscribers() {
p.Subscribers.SignalShutdown()
@@ -118,3 +117,10 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
}
return
}
+
+func (p *LocalPartition) Shutdown() {
+ p.closePublishers()
+ p.closeSubscribers()
+ p.LogBuffer.ShutdownLogBuffer()
+ atomic.StoreInt32(&p.FollowerId, 0)
+}