aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-12 09:19:06 -0700
committerchrislu <chris.lu@gmail.com>2024-03-12 09:19:06 -0700
commit6f615c62aef3fc5153407bccd3c6680a589f0d4d (patch)
treed9bfd7ec2cc0157ad4156bb553ee2d1f1d117bc8 /weed
parent6f75a0af55811fcfd584f4bf136cdc42db9cff5a (diff)
parent604091a4807a56b7a56caeeb316c0e5a7642e57d (diff)
downloadseaweedfs-6f615c62aef3fc5153407bccd3c6680a589f0d4d.tar.xz
seaweedfs-6f615c62aef3fc5153407bccd3c6680a589f0d4d.zip
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed')
-rw-r--r--weed/command/master_follower.go2
-rw-r--r--weed/filer/meta_aggregator.go67
-rw-r--r--weed/server/volume_grpc_copy.go2
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go2
-rw-r--r--weed/storage/store.go2
5 files changed, 32 insertions, 43 deletions
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index c4f0cfc3c..64583c602 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -52,7 +52,7 @@ var cmdMasterFollower = &Command{
In most cases, the master follower is not needed. In big data centers with thousands of volume
servers. In theory, the master may have trouble to keep up with the write requests and read requests.
- The master follower can relieve the master from from read requests, which only needs to
+ The master follower can relieve the master from read requests, which only needs to
lookup a fileId or volumeId.
The master follower currently can handle fileId lookup requests:
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index d013d5a19..80a041d4e 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -21,13 +21,13 @@ import (
)
type MetaAggregator struct {
- filer *Filer
- self pb.ServerAddress
- isLeader bool
- grpcDialOption grpc.DialOption
- MetaLogBuffer *log_buffer.LogBuffer
- peerStatues map[pb.ServerAddress]int
- peerStatuesLock sync.Mutex
+ filer *Filer
+ self pb.ServerAddress
+ isLeader bool
+ grpcDialOption grpc.DialOption
+ MetaLogBuffer *log_buffer.LogBuffer
+ peerChans map[pb.ServerAddress]chan struct{}
+ peerChansLock sync.Mutex
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
@@ -40,7 +40,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
filer: filer,
self: self,
grpcDialOption: grpcDialOption,
- peerStatues: make(map[pb.ServerAddress]int),
+ peerChans: make(map[pb.ServerAddress]chan struct{}),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, nil, func() {
@@ -50,51 +50,40 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
}
func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ ma.peerChansLock.Lock()
+ defer ma.peerChansLock.Unlock()
+
address := pb.ServerAddress(update.Address)
if update.IsAdd {
- // every filer should subscribe to a new filer
- if ma.setActive(address, true) {
- go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom)
- }
- } else {
- ma.setActive(address, false)
- }
-}
-
-func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
- ma.peerStatuesLock.Lock()
- defer ma.peerStatuesLock.Unlock()
- if isActive {
- if _, found := ma.peerStatues[address]; found {
- ma.peerStatues[address] += 1
- } else {
- ma.peerStatues[address] = 1
- notDuplicated = true
+ // cancel previous subscription if any
+ if prevChan, found := ma.peerChans[address]; found {
+ close(prevChan)
}
+ stopChan := make(chan struct{})
+ ma.peerChans[address] = stopChan
+ go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom, stopChan)
} else {
- if _, found := ma.peerStatues[address]; found {
- delete(ma.peerStatues, address)
+ if prevChan, found := ma.peerChans[address]; found {
+ close(prevChan)
+ delete(ma.peerChans, address)
}
}
- return
-}
-func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
- ma.peerStatuesLock.Lock()
- defer ma.peerStatuesLock.Unlock()
- var count int
- count, isActive = ma.peerStatues[address]
- return count > 0 && isActive
}
-func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) {
+func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
lastTsNs := startFrom.UnixNano()
for {
glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
- if !ma.isActive(peer) {
- glog.V(0).Infof("stop subscribing remote %s meta change", peer)
+
+ // check stopChan to see if we should stop
+ select {
+ case <-stopChan:
+ glog.V(0).Infof("stop subscribing peer %s meta change", peer)
return
+ default:
}
+
if err != nil {
errLvl := glog.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") {
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 92fe0a1be..51b61b225 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -226,7 +226,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe
/*
*
-only check the the differ of the file size
+only check the differ of the file size
todo: maybe should check the received count and deleted count of the volume
*/
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, hasRemoteDatFile bool, idxFileName, datFileName string) error {
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
index 9a415fc78..a94ca1fe3 100644
--- a/weed/storage/erasure_coding/ec_decoder.go
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -150,7 +150,7 @@ func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId
}
-// WriteDatFile generates .dat from from .ec00 ~ .ec09 files
+// WriteDatFile generates .dat from .ec00 ~ .ec09 files
func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error {
datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 782eb5b79..cf2e0dcf9 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -345,7 +345,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
- for col, deletedBytes := range collectionVolumeDeletedBytes{
+ for col, deletedBytes := range collectionVolumeDeletedBytes {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes").Set(float64(deletedBytes))
}