aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/mq/broker/broker_server.go2
-rw-r--r--weed/server/master_server.go2
-rw-r--r--weed/wdclient/masterclient.go12
4 files changed, 14 insertions, 4 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 993175112..fe5fe289a 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -96,7 +96,7 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {
f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
- f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate
+ f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate)
for _, peerUpdate := range existingNodes {
f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom)
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 89afb6e4d..4c86d813f 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -42,7 +42,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
filers: make(map[pb.ServerAddress]struct{}),
}
- mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate
+ mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
go mqBroker.MasterClient.KeepConnectedToMaster()
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index ecbfd64af..758f212ad 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -116,7 +116,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
}
ms.boundedLeaderChan = make(chan int, 16)
- ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
+ ms.MasterClient.SetOnPeerUpdateFn(ms.OnPeerUpdate)
seq := ms.createSequencer(option)
if nil == seq {
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 2583bda80..6e91c31eb 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -28,7 +29,8 @@ type MasterClient struct {
vidMap
vidMapCacheSize int
- OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
+ OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
+ OnPeerUpdateAccessLock sync.RWMutex
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient {
@@ -44,6 +46,12 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy
}
}
+func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
+ mc.OnPeerUpdateAccessLock.Lock()
+ mc.OnPeerUpdate = onPeerUpdate
+ mc.OnPeerUpdateAccessLock.Unlock()
+}
+
func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
return mc.LookupFileIdWithFallback
}
@@ -219,6 +227,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
+ mc.OnPeerUpdateAccessLock.RLock()
if mc.OnPeerUpdate != nil {
if update.FilerGroup == mc.FilerGroup {
if update.IsAdd {
@@ -230,6 +239,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
mc.OnPeerUpdate(update, time.Now())
}
}
+ mc.OnPeerUpdateAccessLock.RUnlock()
}
}
})