diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-12 09:19:06 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-12 09:19:06 -0700 |
| commit | 6f615c62aef3fc5153407bccd3c6680a589f0d4d (patch) | |
| tree | d9bfd7ec2cc0157ad4156bb553ee2d1f1d117bc8 /weed | |
| parent | 6f75a0af55811fcfd584f4bf136cdc42db9cff5a (diff) | |
| parent | 604091a4807a56b7a56caeeb316c0e5a7642e57d (diff) | |
| download | seaweedfs-6f615c62aef3fc5153407bccd3c6680a589f0d4d.tar.xz seaweedfs-6f615c62aef3fc5153407bccd3c6680a589f0d4d.zip | |
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/master_follower.go | 2 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 67 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 2 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_decoder.go | 2 | ||||
| -rw-r--r-- | weed/storage/store.go | 2 |
5 files changed, 32 insertions, 43 deletions
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index c4f0cfc3c..64583c602 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -52,7 +52,7 @@ var cmdMasterFollower = &Command{ In most cases, the master follower is not needed. In big data centers with thousands of volume servers. In theory, the master may have trouble to keep up with the write requests and read requests. - The master follower can relieve the master from from read requests, which only needs to + The master follower can relieve the master from read requests, which only needs to lookup a fileId or volumeId. The master follower currently can handle fileId lookup requests: diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index d013d5a19..80a041d4e 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -21,13 +21,13 @@ import ( ) type MetaAggregator struct { - filer *Filer - self pb.ServerAddress - isLeader bool - grpcDialOption grpc.DialOption - MetaLogBuffer *log_buffer.LogBuffer - peerStatues map[pb.ServerAddress]int - peerStatuesLock sync.Mutex + filer *Filer + self pb.ServerAddress + isLeader bool + grpcDialOption grpc.DialOption + MetaLogBuffer *log_buffer.LogBuffer + peerChans map[pb.ServerAddress]chan struct{} + peerChansLock sync.Mutex // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond @@ -40,7 +40,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. filer: filer, self: self, grpcDialOption: grpcDialOption, - peerStatues: make(map[pb.ServerAddress]int), + peerChans: make(map[pb.ServerAddress]chan struct{}), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, nil, func() { @@ -50,51 +50,40 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. } func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + ma.peerChansLock.Lock() + defer ma.peerChansLock.Unlock() + address := pb.ServerAddress(update.Address) if update.IsAdd { - // every filer should subscribe to a new filer - if ma.setActive(address, true) { - go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom) - } - } else { - ma.setActive(address, false) - } -} - -func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { - ma.peerStatuesLock.Lock() - defer ma.peerStatuesLock.Unlock() - if isActive { - if _, found := ma.peerStatues[address]; found { - ma.peerStatues[address] += 1 - } else { - ma.peerStatues[address] = 1 - notDuplicated = true + // cancel previous subscription if any + if prevChan, found := ma.peerChans[address]; found { + close(prevChan) } + stopChan := make(chan struct{}) + ma.peerChans[address] = stopChan + go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom, stopChan) } else { - if _, found := ma.peerStatues[address]; found { - delete(ma.peerStatues, address) + if prevChan, found := ma.peerChans[address]; found { + close(prevChan) + delete(ma.peerChans, address) } } - return -} -func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { - ma.peerStatuesLock.Lock() - defer ma.peerStatuesLock.Unlock() - var count int - count, isActive = ma.peerStatues[address] - return count > 0 && isActive } -func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) { +func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) { lastTsNs := startFrom.UnixNano() for { glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs) nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs) - if !ma.isActive(peer) { - glog.V(0).Infof("stop subscribing remote %s meta change", peer) + + // check stopChan to see if we should stop + select { + case <-stopChan: + glog.V(0).Infof("stop subscribing peer %s meta change", peer) return + default: } + if err != nil { errLvl := glog.Level(0) if strings.Contains(err.Error(), "duplicated local subscription detected") { diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 92fe0a1be..51b61b225 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -226,7 +226,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe /* * -only check the the differ of the file size +only check the differ of the file size todo: maybe should check the received count and deleted count of the volume */ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, hasRemoteDatFile bool, idxFileName, datFileName string) error { diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index 9a415fc78..a94ca1fe3 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -150,7 +150,7 @@ func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId } -// WriteDatFile generates .dat from from .ec00 ~ .ec09 files +// WriteDatFile generates .dat from .ec00 ~ .ec09 files func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error { datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) diff --git a/weed/storage/store.go b/weed/storage/store.go index 782eb5b79..cf2e0dcf9 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -345,7 +345,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) } - for col, deletedBytes := range collectionVolumeDeletedBytes{ + for col, deletedBytes := range collectionVolumeDeletedBytes { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes").Set(float64(deletedBytes)) } |
