diff options
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 0947d259b..062f3f4bd 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "sync/atomic" "time" ) @@ -15,10 +16,9 @@ type LocalPartition struct { FollowerBrokers []pb.ServerAddress LogBuffer *log_buffer.LogBuffer ConsumerCount int32 - StopPublishersCh chan struct{} Publishers *LocalPartitionPublishers - StopSubscribersCh chan struct{} Subscribers *LocalPartitionSubscribers + FollowerId int32 } var TIME_FORMAT = "2006-01-02-15-04-05" @@ -58,6 +58,9 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M startPosition = processedPosition processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn) + if isDone { + return nil + } startPosition = processedPosition if readInMemoryLogErr == log_buffer.ResumeFromDiskError { @@ -67,9 +70,6 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) return readInMemoryLogErr } - if isDone { - return nil - } } } @@ -96,7 +96,6 @@ func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, func (p *LocalPartition) closePublishers() { p.Publishers.SignalShutdown() - close(p.StopPublishersCh) } func (p *LocalPartition) closeSubscribers() { p.Subscribers.SignalShutdown() @@ -118,3 +117,10 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { } return } + +func (p *LocalPartition) Shutdown() { + p.closePublishers() + p.closeSubscribers() + p.LogBuffer.ShutdownLogBuffer() + atomic.StoreInt32(&p.FollowerId, 0) +} |
