diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-11-06 04:07:38 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-11-06 11:29:50 -0700 |
| commit | 330d1fde7f6a9634c1242a0490fab30cbdb12c6c (patch) | |
| tree | 8b6eecb343174cc42fb3f44ab393937166359777 /weed/server | |
| parent | 12e6692dac5a4b8152fe08c66d57eb3b0e1a4f11 (diff) | |
| download | seaweedfs-330d1fde7f6a9634c1242a0490fab30cbdb12c6c.tar.xz seaweedfs-330d1fde7f6a9634c1242a0490fab30cbdb12c6c.zip | |
send peers info to filers
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server.go | 35 | ||||
| -rw-r--r-- | weed/server/master_server.go | 4 |
2 files changed, 21 insertions, 18 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1e4bbd8e4..7411bbc99 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -44,11 +44,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(message.DeletedVids) > 0 { - ms.clientChansLock.RLock() - for _, ch := range ms.clientChans { - ch <- message - } - ms.clientChansLock.RUnlock() + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } } }() @@ -153,12 +149,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { - ms.clientChansLock.RLock() - for host, ch := range ms.clientChans { - glog.V(0).Infof("master send to %s: %s", host, message.String()) - ch <- message - } - ms.clientChansLock.RUnlock() + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } // tell the volume servers about the leader @@ -195,10 +186,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ stopChan := make(chan bool, 1) clientName, messageChan := ms.addClient(req.ClientType, peerAddress) - ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) + for _, update := range ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) { + ms.broadcastToClients(update) + } defer func() { - ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) + for _, update := range ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) { + ms.broadcastToClients(update) + } ms.deleteClient(clientName) }() @@ -223,7 +218,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ for { select { case message := <-messageChan: - if err := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); err != nil { + if err := stream.Send(message); err != nil { glog.V(0).Infof("=> client %v: %+v", clientName, message) return err } @@ -238,6 +233,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } +func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() +} + func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error { leader, err := ms.Topo.Leader() if err != nil { @@ -254,7 +257,7 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe return nil } -func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.VolumeLocation) { +func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) { clientName = clientType + "@" + string(clientAddress) glog.V(0).Infof("+ client %v", clientName) @@ -263,7 +266,7 @@ func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddr // trying to send to it in SendHeartbeat and so we can't lock the // clientChansLock to remove the channel and we're stuck writing to it // 100 is probably overkill - messageChan = make(chan *master_pb.VolumeLocation, 100) + messageChan = make(chan *master_pb.KeepConnectedResponse, 100) ms.clientChansLock.Lock() ms.clientChans[clientName] = messageChan diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 26ac91e8f..39812f641 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -61,7 +61,7 @@ type MasterServer struct { // notifying clients clientChansLock sync.RWMutex - clientChans map[string]chan *master_pb.VolumeLocation + clientChans map[string]chan *master_pb.KeepConnectedResponse grpcDialOption grpc.DialOption @@ -102,7 +102,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre option: option, preallocateSize: preallocateSize, vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), - clientChans: make(map[string]chan *master_pb.VolumeLocation), + clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), adminLocks: NewAdminLocks(), |
