diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-07-27 23:09:55 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-07-27 23:09:55 -0700 |
| commit | a12c7b86b0ca7ebd35f8763ebc89c675a49f8c59 (patch) | |
| tree | 9dc178cc45dbc03ffce8b15a7a39fbe37b3b1e95 /weed/server/master_grpc_server.go | |
| parent | f82ac793b4db7dfcca3d69a5a9c7d0bb3e20d9ac (diff) | |
| download | seaweedfs-a12c7b86b0ca7ebd35f8763ebc89c675a49f8c59.tar.xz seaweedfs-a12c7b86b0ca7ebd35f8763ebc89c675a49f8c59.zip | |
broadcast messages of new and deleted volumes
Diffstat (limited to 'weed/server/master_grpc_server.go')
| -rw-r--r-- | weed/server/master_grpc_server.go | 102 |
1 files changed, 95 insertions, 7 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index f24cea619..c12938374 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/topology" "google.golang.org/grpc/peer" + "fmt" ) func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { @@ -16,8 +17,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ defer func() { if dn != nil { + glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) t.UnRegisterDataNode(dn) + + message := &master_pb.VolumeLocation{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + } + for _, v := range dn.GetVolumes() { + message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) + } + + if len(message.DeletedVids) > 0 { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() + } + } }() @@ -49,7 +68,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } - t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + + message := &master_pb.VolumeLocation{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + } + for _, v := range newVolumes { + message.NewVids = append(message.NewVids, uint32(v.Id)) + } + for _, v := range deletedVolumes { + message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) + } + + if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() + } } else { return err @@ -69,13 +107,63 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up. func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error { - for { - _, err := stream.Recv() - if err != nil { - return err + + req, err := stream.Recv() + if err != nil { + return err + } + + // remember client address + ctx := stream.Context() + // fmt.Printf("FromContext %+v\n", ctx) + pr, ok := peer.FromContext(ctx) + if !ok { + glog.Error("failed to get peer from ctx") + return fmt.Errorf("failed to get peer from ctx") + } + if pr.Addr == net.Addr(nil) { + glog.Error("failed to get peer address") + return fmt.Errorf("failed to get peer address") + } + + clientName := req.Name + pr.Addr.String() + glog.V(0).Infof("+ client %v", clientName) + + messageChan := make(chan *master_pb.VolumeLocation) + stopChan := make(chan bool) + + ms.clientChansLock.Lock() + ms.clientChans[clientName] = messageChan + ms.clientChansLock.Unlock() + + defer func() { + glog.V(0).Infof("- client %v", clientName) + ms.clientChansLock.Lock() + delete(ms.clientChans, clientName) + ms.clientChansLock.Unlock() + }() + + go func() { + for { + _, err := stream.Recv() + if err != nil { + glog.V(2).Infof("- client %v: %v", clientName, err) + stopChan <- true + break + } } - if err := stream.Send(&master_pb.Empty{}); err != nil { - return err + }() + + for { + select { + case message := <-messageChan: + if err := stream.Send(message); err != nil { + return err + } + case <-stopChan: + return nil } } + + return nil } |
