diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 38 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_listen.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 4 |
3 files changed, 40 insertions, 4 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 266030f5d..901f798f0 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -390,8 +390,12 @@ func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedS } clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort) + m := make(map[string]bool) + for _, tp := range req.Resources { + m[tp] = true + } fs.brokersLock.Lock() - fs.brokers[clientName] = true + fs.brokers[clientName] = m glog.V(0).Infof("+ broker %v", clientName) fs.brokersLock.Unlock() @@ -417,3 +421,35 @@ func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedS } } + +func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { + + resp = &filer_pb.LocateBrokerResponse{} + + fs.brokersLock.Lock() + defer fs.brokersLock.Unlock() + + var localBrokers []*filer_pb.LocateBrokerResponse_Resource + + for b, m := range fs.brokers { + if _, found := m[req.Resource]; found { + resp.Found = true + resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ + { + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }, + } + return + } + localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }) + } + + resp.Resources = localBrokers + + return resp, nil + +} diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index e45068f39..848a1fc3a 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -82,7 +82,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, processedTsNs) } - _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 0a54ea97d..10b607dfe 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -64,7 +64,7 @@ type FilerServer struct { listenersLock sync.Mutex listenersCond *sync.Cond - brokers map[string]bool + brokers map[string]map[string]bool brokersLock sync.Mutex } @@ -73,7 +73,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), - brokers: make(map[string]bool), + brokers: make(map[string]map[string]bool), } fs.listenersCond = sync.NewCond(&fs.listenersLock) |
