aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-08-24 01:26:56 -0700
committerChris Lu <chris.lu@gmail.com>2018-08-24 01:26:56 -0700
commit76cbe8bf3371fbdbb65cf13fe0d767164b53b151 (patch)
treeacefbaeb8ab2b66b0ddfb07415abd8c263c63e94 /weed/server
parent5ccf8e8078eee570f7ff444ea86a07589394a4ed (diff)
downloadseaweedfs-76cbe8bf3371fbdbb65cf13fe0d767164b53b151.tar.xz
seaweedfs-76cbe8bf3371fbdbb65cf13fe0d767164b53b151.zip
instant notification of new volumes added or deleted
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server.go22
-rw-r--r--weed/server/volume_grpc_client.go16
2 files changed, 31 insertions, 7 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index e12449819..d2c86598e 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -72,17 +72,24 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
- newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
-
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
}
- for _, v := range newVolumes {
- message.NewVids = append(message.NewVids, uint32(v.Id))
- }
- for _, v := range deletedVolumes {
- message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ if len(heartbeat.NewVids) > 0 || len(heartbeat.DeletedVids) > 0 {
+ // process delta volume ids if exists for fast volume id updates
+ message.NewVids = append(message.NewVids, heartbeat.NewVids...)
+ message.DeletedVids = append(message.DeletedVids, heartbeat.DeletedVids...)
+ } else {
+ // process heartbeat.Volumes
+ newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+
+ for _, v := range newVolumes {
+ message.NewVids = append(message.NewVids, uint32(v.Id))
+ }
+ for _, v := range deletedVolumes {
+ message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ }
}
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
@@ -169,6 +176,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
select {
case message := <-messageChan:
if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("=> client %v: %+v", clientName, message)
return err
}
case <-stopChan:
diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go
index de6fa23c7..219948936 100644
--- a/weed/server/volume_grpc_client.go
+++ b/weed/server/volume_grpc_client.go
@@ -89,6 +89,22 @@ func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Durati
for {
select {
+ case vid := <-vs.store.NewVolumeIdChan:
+ deltaBeat := &master_pb.Heartbeat{
+ NewVids: []uint32{uint32(vid)},
+ }
+ if err = stream.Send(deltaBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
+ case vid := <-vs.store.DeletedVolumeIdChan:
+ deltaBeat := &master_pb.Heartbeat{
+ DeletedVids: []uint32{uint32(vid)},
+ }
+ if err = stream.Send(deltaBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
case <-tickChan:
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)