aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_server.go4
-rw-r--r--weed/server/master_grpc_server.go84
-rw-r--r--weed/server/master_server.go2
3 files changed, 62 insertions, 28 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index bfb182dbe..57caeb6d4 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -44,7 +44,7 @@ type FilerOption struct {
DataCenter string
DefaultLevelDbDir string
DisableHttp bool
- Port int
+ Port uint32
recursiveDelete bool
}
@@ -66,7 +66,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000)
go fs.filer.KeepConnectedToMaster()
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index d308130d6..3f699a7a2 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -1,8 +1,10 @@
package weed_server
import (
+ "context"
"fmt"
"net"
+ "strings"
"time"
"github.com/chrislusf/raft"
@@ -181,35 +183,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
return ms.informNewLeader(stream)
}
- // remember client address
- ctx := stream.Context()
- // fmt.Printf("FromContext %+v\n", ctx)
- pr, ok := peer.FromContext(ctx)
- if !ok {
- glog.Error("failed to get peer from ctx")
- return fmt.Errorf("failed to get peer from ctx")
- }
- if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
- return fmt.Errorf("failed to get peer address")
- }
-
- clientName := req.Name + pr.Addr.String()
- glog.V(0).Infof("+ client %v", clientName)
+ peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
- messageChan := make(chan *master_pb.VolumeLocation)
stopChan := make(chan bool)
- ms.clientChansLock.Lock()
- ms.clientChans[clientName] = messageChan
- ms.clientChansLock.Unlock()
+ clientName, messageChan := ms.addClient(req.Name, peerAddress)
- defer func() {
- glog.V(0).Infof("- client %v", clientName)
- ms.clientChansLock.Lock()
- delete(ms.clientChans, clientName)
- ms.clientChansLock.Unlock()
- }()
+ defer ms.deleteClient(clientName)
for _, message := range ms.Topo.ToVolumeLocations() {
if err := stream.Send(message); err != nil {
@@ -261,3 +241,57 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
}
return nil
}
+
+func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ client %v", clientName)
+
+ messageChan = make(chan *master_pb.VolumeLocation)
+
+ ms.clientChansLock.Lock()
+ ms.clientChans[clientName] = messageChan
+ ms.clientChansLock.Unlock()
+ return
+}
+
+func (ms *MasterServer) deleteClient(clientName string) {
+ glog.V(0).Infof("- client %v", clientName)
+ ms.clientChansLock.Lock()
+ delete(ms.clientChans, clientName)
+ ms.clientChansLock.Unlock()
+}
+
+func findClientAddress(ctx context.Context, grpcPort uint32) string {
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return ""
+ }
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return ""
+ }
+ if grpcPort == 0 {
+ return pr.Addr.String()
+ }
+ if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
+ externalIP := tcpAddr.IP
+ return fmt.Sprintf("%s:%d", externalIP, grpcPort)
+ }
+ return pr.Addr.String()
+
+}
+
+func (ms *MasterServer ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
+ resp := &master_pb.ListMasterClientsResponse{}
+ ms.clientChansLock.RLock()
+ defer ms.clientChansLock.RUnlock()
+
+ for k := range ms.clientChans {
+ if strings.HasPrefix(k, req.ClientType+"@") {
+ resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
+ }
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 095008339..a9ae6b888 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -88,7 +88,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", 0, peers),
}
ms.bounedLeaderChan = make(chan int, 16)