diff options
| author | HongyanShen <763987993@qq.com> | 2020-03-11 12:55:24 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-11 12:55:24 +0800 |
| commit | 03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch) | |
| tree | ed8833386a712c850dcef0815509774681a6ab56 /weed/server/master_grpc_server.go | |
| parent | 0fca1ae776783b37481549df40f477b7d9248d3c (diff) | |
| parent | 60f5f05c78a2918d5219c925cea5847759281a2c (diff) | |
| download | seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip | |
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/server/master_grpc_server.go')
| -rw-r--r-- | weed/server/master_grpc_server.go | 104 |
1 files changed, 66 insertions, 38 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 82a190e39..84087df8b 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -1,17 +1,20 @@ package weed_server import ( + "context" "fmt" "net" "strings" "time" "github.com/chrislusf/raft" + "google.golang.org/grpc/peer" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" - "google.golang.org/grpc/peer" ) func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { @@ -60,14 +63,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ t.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { - if heartbeat.Ip == "" { - if pr, ok := peer.FromContext(stream.Context()); ok { - if pr.Addr != net.Addr(nil) { - heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")] - glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip) - } - } - } dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) @@ -76,7 +71,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, + VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + StorageBackends: backend.ToPbStorageBackends(), }); err != nil { glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err) return err @@ -164,9 +162,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } if err := stream.Send(&master_pb.HeartbeatResponse{ - Leader: newLeader, - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + Leader: newLeader, }); err != nil { glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err) return err @@ -187,35 +183,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ return ms.informNewLeader(stream) } - // remember client address - ctx := stream.Context() - // fmt.Printf("FromContext %+v\n", ctx) - pr, ok := peer.FromContext(ctx) - if !ok { - glog.Error("failed to get peer from ctx") - return fmt.Errorf("failed to get peer from ctx") - } - if pr.Addr == net.Addr(nil) { - glog.Error("failed to get peer address") - return fmt.Errorf("failed to get peer address") - } + peerAddress := findClientAddress(stream.Context(), req.GrpcPort) - clientName := req.Name + pr.Addr.String() - glog.V(0).Infof("+ client %v", clientName) - - messageChan := make(chan *master_pb.VolumeLocation) stopChan := make(chan bool) - ms.clientChansLock.Lock() - ms.clientChans[clientName] = messageChan - ms.clientChansLock.Unlock() + clientName, messageChan := ms.addClient(req.Name, peerAddress) - defer func() { - glog.V(0).Infof("- client %v", clientName) - ms.clientChansLock.Lock() - delete(ms.clientChans, clientName) - ms.clientChansLock.Unlock() - }() + defer ms.deleteClient(clientName) for _, message := range ms.Topo.ToVolumeLocations() { if err := stream.Send(message); err != nil { @@ -267,3 +241,57 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe } return nil } + +func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) { + clientName = clientType + "@" + clientAddress + glog.V(0).Infof("+ client %v", clientName) + + messageChan = make(chan *master_pb.VolumeLocation) + + ms.clientChansLock.Lock() + ms.clientChans[clientName] = messageChan + ms.clientChansLock.Unlock() + return +} + +func (ms *MasterServer) deleteClient(clientName string) { + glog.V(0).Infof("- client %v", clientName) + ms.clientChansLock.Lock() + delete(ms.clientChans, clientName) + ms.clientChansLock.Unlock() +} + +func findClientAddress(ctx context.Context, grpcPort uint32) string { + // fmt.Printf("FromContext %+v\n", ctx) + pr, ok := peer.FromContext(ctx) + if !ok { + glog.Error("failed to get peer from ctx") + return "" + } + if pr.Addr == net.Addr(nil) { + glog.Error("failed to get peer address") + return "" + } + if grpcPort == 0 { + return pr.Addr.String() + } + if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok { + externalIP := tcpAddr.IP + return fmt.Sprintf("%s:%d", externalIP, grpcPort) + } + return pr.Addr.String() + +} + +func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) { + resp := &master_pb.ListMasterClientsResponse{} + ms.clientChansLock.RLock() + defer ms.clientChansLock.RUnlock() + + for k := range ms.clientChans { + if strings.HasPrefix(k, req.ClientType+"@") { + resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:]) + } + } + return resp, nil +} |
