diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2018-07-28 21:03:29 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-07-28 21:03:29 -0700 |
| commit | 452bd0b01393e53e958fb9825bf1f27e6b3522df (patch) | |
| tree | e1a61e592118f9696b7f51501d3b3fd0f6c3eeb5 /weed/server/master_grpc_server.go | |
| parent | 97603d6e176dd2b9f2aebd9f6122a8c60481463a (diff) | |
| parent | d3205a007071f26587affb416f71b5c63854b863 (diff) | |
| download | seaweedfs-452bd0b01393e53e958fb9825bf1f27e6b3522df.tar.xz seaweedfs-452bd0b01393e53e958fb9825bf1f27e6b3522df.zip | |
Merge pull request #702 from chrislusf/add_topo_listener
Add volume id location change listener
Diffstat (limited to 'weed/server/master_grpc_server.go')
| -rw-r--r-- | weed/server/master_grpc_server.go | 114 |
1 files changed, 107 insertions, 7 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index f24cea619..2952a8071 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -1,9 +1,11 @@ package weed_server import ( + "fmt" "net" "strings" + "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/topology" @@ -16,8 +18,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ defer func() { if dn != nil { + glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) t.UnRegisterDataNode(dn) + + message := &master_pb.VolumeLocation{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + } + for _, v := range dn.GetVolumes() { + message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) + } + + if len(message.DeletedVids) > 0 { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() + } + } }() @@ -49,7 +69,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } - t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + + message := &master_pb.VolumeLocation{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + } + for _, v := range newVolumes { + message.NewVids = append(message.NewVids, uint32(v.Id)) + } + for _, v := range deletedVolumes { + message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) + } + + if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() + } } else { return err @@ -67,15 +106,76 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } -// KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up. +// KeepConnected keep a stream gRPC call to the master. Used by clients to know the master is up. +// And clients gets the up-to-date list of volume locations func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error { - for { - _, err := stream.Recv() - if err != nil { + + req, err := stream.Recv() + if err != nil { + return err + } + + if !ms.Topo.IsLeader() { + return raft.NotLeaderError + } + + // remember client address + ctx := stream.Context() + // fmt.Printf("FromContext %+v\n", ctx) + pr, ok := peer.FromContext(ctx) + if !ok { + glog.Error("failed to get peer from ctx") + return fmt.Errorf("failed to get peer from ctx") + } + if pr.Addr == net.Addr(nil) { + glog.Error("failed to get peer address") + return fmt.Errorf("failed to get peer address") + } + + clientName := req.Name + pr.Addr.String() + glog.V(0).Infof("+ client %v", clientName) + + messageChan := make(chan *master_pb.VolumeLocation) + stopChan := make(chan bool) + + ms.clientChansLock.Lock() + ms.clientChans[clientName] = messageChan + ms.clientChansLock.Unlock() + + defer func() { + glog.V(0).Infof("- client %v", clientName) + ms.clientChansLock.Lock() + delete(ms.clientChans, clientName) + ms.clientChansLock.Unlock() + }() + + for _, message := range ms.Topo.ToVolumeLocations() { + if err := stream.Send(message); err != nil { return err } - if err := stream.Send(&master_pb.Empty{}); err != nil { - return err + } + + go func() { + for { + _, err := stream.Recv() + if err != nil { + glog.V(2).Infof("- client %v: %v", clientName, err) + stopChan <- true + break + } + } + }() + + for { + select { + case message := <-messageChan: + if err := stream.Send(message); err != nil { + return err + } + case <-stopChan: + return nil } } + + return nil } |
