aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go38
-rw-r--r--weed/server/filer_grpc_server_listen.go2
-rw-r--r--weed/server/filer_server.go4
3 files changed, 40 insertions, 4 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 266030f5d..901f798f0 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -390,8 +390,12 @@ func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedS
}
clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort)
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
fs.brokersLock.Lock()
- fs.brokers[clientName] = true
+ fs.brokers[clientName] = m
glog.V(0).Infof("+ broker %v", clientName)
fs.brokersLock.Unlock()
@@ -417,3 +421,35 @@ func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedS
}
}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
+}
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
index e45068f39..848a1fc3a 100644
--- a/weed/server/filer_grpc_server_listen.go
+++ b/weed/server/filer_grpc_server_listen.go
@@ -82,7 +82,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, processedTsNs)
}
- _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 0a54ea97d..10b607dfe 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -64,7 +64,7 @@ type FilerServer struct {
listenersLock sync.Mutex
listenersCond *sync.Cond
- brokers map[string]bool
+ brokers map[string]map[string]bool
brokersLock sync.Mutex
}
@@ -73,7 +73,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
- brokers: make(map[string]bool),
+ brokers: make(map[string]map[string]bool),
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)