diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 72 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_listen.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 7 |
3 files changed, 79 insertions, 2 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index a790e4601..901f798f0 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -381,3 +381,75 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. return t, nil } + +func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error { + + req, err := stream.Recv() + if err != nil { + return err + } + + clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort) + m := make(map[string]bool) + for _, tp := range req.Resources { + m[tp] = true + } + fs.brokersLock.Lock() + fs.brokers[clientName] = m + glog.V(0).Infof("+ broker %v", clientName) + fs.brokersLock.Unlock() + + defer func() { + fs.brokersLock.Lock() + delete(fs.brokers, clientName) + glog.V(0).Infof("- broker %v: %v", clientName, err) + fs.brokersLock.Unlock() + }() + + for { + if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil { + glog.V(0).Infof("send broker %v: %+v", clientName, err) + return err + } + // println("replied") + + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("recv broker %v: %v", clientName, err) + return err + } + // println("received") + } + +} + +func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { + + resp = &filer_pb.LocateBrokerResponse{} + + fs.brokersLock.Lock() + defer fs.brokersLock.Unlock() + + var localBrokers []*filer_pb.LocateBrokerResponse_Resource + + for b, m := range fs.brokers { + if _, found := m[req.Resource]; found { + resp.Found = true + resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ + { + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }, + } + return + } + localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }) + } + + resp.Resources = localBrokers + + return resp, nil + +} diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index e45068f39..848a1fc3a 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -82,7 +82,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, processedTsNs) } - _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 956684d46..10b607dfe 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -8,9 +8,10 @@ import ( "sync" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -62,6 +63,9 @@ type FilerServer struct { // notifying clients listenersLock sync.Mutex listenersCond *sync.Cond + + brokers map[string]map[string]bool + brokersLock sync.Mutex } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { @@ -69,6 +73,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), + brokers: make(map[string]map[string]bool), } fs.listenersCond = sync.NewCond(&fs.listenersLock) |
