aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_grpc_server.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-07-27 23:09:55 -0700
committerChris Lu <chris.lu@gmail.com>2018-07-27 23:09:55 -0700
commita12c7b86b0ca7ebd35f8763ebc89c675a49f8c59 (patch)
tree9dc178cc45dbc03ffce8b15a7a39fbe37b3b1e95 /weed/server/master_grpc_server.go
parentf82ac793b4db7dfcca3d69a5a9c7d0bb3e20d9ac (diff)
downloadseaweedfs-a12c7b86b0ca7ebd35f8763ebc89c675a49f8c59.tar.xz
seaweedfs-a12c7b86b0ca7ebd35f8763ebc89c675a49f8c59.zip
broadcast messages of new and deleted volumes
Diffstat (limited to 'weed/server/master_grpc_server.go')
-rw-r--r--weed/server/master_grpc_server.go102
1 files changed, 95 insertions, 7 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index f24cea619..c12938374 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/topology"
"google.golang.org/grpc/peer"
+ "fmt"
)
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
@@ -16,8 +17,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
defer func() {
if dn != nil {
+
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
t.UnRegisterDataNode(dn)
+
+ message := &master_pb.VolumeLocation{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ }
+ for _, v := range dn.GetVolumes() {
+ message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ }
+
+ if len(message.DeletedVids) > 0 {
+ ms.clientChansLock.RLock()
+ for _, ch := range ms.clientChans {
+ ch <- message
+ }
+ ms.clientChansLock.RUnlock()
+ }
+
}
}()
@@ -49,7 +68,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
- t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+ newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+
+ message := &master_pb.VolumeLocation{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ }
+ for _, v := range newVolumes {
+ message.NewVids = append(message.NewVids, uint32(v.Id))
+ }
+ for _, v := range deletedVolumes {
+ message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ }
+
+ if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
+ ms.clientChansLock.RLock()
+ for _, ch := range ms.clientChans {
+ ch <- message
+ }
+ ms.clientChansLock.RUnlock()
+ }
} else {
return err
@@ -69,13 +107,63 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up.
func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
- for {
- _, err := stream.Recv()
- if err != nil {
- return err
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ // remember client address
+ ctx := stream.Context()
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return fmt.Errorf("failed to get peer from ctx")
+ }
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return fmt.Errorf("failed to get peer address")
+ }
+
+ clientName := req.Name + pr.Addr.String()
+ glog.V(0).Infof("+ client %v", clientName)
+
+ messageChan := make(chan *master_pb.VolumeLocation)
+ stopChan := make(chan bool)
+
+ ms.clientChansLock.Lock()
+ ms.clientChans[clientName] = messageChan
+ ms.clientChansLock.Unlock()
+
+ defer func() {
+ glog.V(0).Infof("- client %v", clientName)
+ ms.clientChansLock.Lock()
+ delete(ms.clientChans, clientName)
+ ms.clientChansLock.Unlock()
+ }()
+
+ go func() {
+ for {
+ _, err := stream.Recv()
+ if err != nil {
+ glog.V(2).Infof("- client %v: %v", clientName, err)
+ stopChan <- true
+ break
+ }
}
- if err := stream.Send(&master_pb.Empty{}); err != nil {
- return err
+ }()
+
+ for {
+ select {
+ case message := <-messageChan:
+ if err := stream.Send(message); err != nil {
+ return err
+ }
+ case <-stopChan:
+ return nil
}
}
+
+ return nil
}