diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-03-30 01:19:33 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-03-30 01:19:33 -0700 |
| commit | 50a5018b7fb259cee84471ce8d7bb6e554602c61 (patch) | |
| tree | 9c90e316fe1b7b06e3c0319a8723b85b84518a95 /weed/server | |
| parent | 9dc0b1df8f3bb19ce01b2d520436dbdc0f2a883e (diff) | |
| download | seaweedfs-50a5018b7fb259cee84471ce8d7bb6e554602c61.tar.xz seaweedfs-50a5018b7fb259cee84471ce8d7bb6e554602c61.zip | |
writing meta logs is working
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_listen.go | 70 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 7 |
2 files changed, 77 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go new file mode 100644 index 000000000..37d643ce2 --- /dev/null +++ b/weed/server/filer_grpc_server_listen.go @@ -0,0 +1,70 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fs *FilerServer) ListenForEvents(req *filer_pb.ListenForEventsRequest, stream filer_pb.SeaweedFiler_ListenForEventsServer) error { + + peerAddress := findClientAddress(stream.Context(), 0) + + clientName, messageChan := fs.addClient(req.ClientName, peerAddress) + + defer fs.deleteClient(clientName, messageChan) + + // ts := time.Unix(req.SinceSec, 0) + + // iterate through old messages + /* + for _, message := range ms.Topo.ToVolumeLocations() { + if err := stream.Send(message); err != nil { + return err + } + } + */ + + // need to add a buffer here to avoid slow clients + // also needs to support millions of clients + + for message := range messageChan { + if err := stream.Send(message); err != nil { + glog.V(0).Infof("=> client %v: %+v", clientName, message) + return err + } + } + + return nil +} + +func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *filer_pb.FullEventNotification) { + clientName = clientType + "@" + clientAddress + glog.V(0).Infof("+ listener %v", clientName) + + messageChan = make(chan *filer_pb.FullEventNotification, 10) + + fs.clientChansLock.Lock() + fs.clientChans[clientName] = messageChan + fs.clientChansLock.Unlock() + return +} + +func (fs *FilerServer) deleteClient(clientName string, messageChan chan *filer_pb.FullEventNotification) { + glog.V(0).Infof("- listener %v", clientName) + close(messageChan) + fs.clientChansLock.Lock() + delete(fs.clientChans, clientName) + fs.clientChansLock.Unlock() +} + +func (fs *FilerServer) sendMessageToClients(dir string, eventNotification *filer_pb.EventNotification) { + message := &filer_pb.FullEventNotification{ + Directory: dir, + EventNotification: eventNotification, + } + fs.clientChansLock.RLock() + for _, ch := range fs.clientChans { + ch <- message + } + fs.clientChansLock.RUnlock() +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c3b959c7c..9b688a08d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -5,11 +5,13 @@ import ( "fmt" "net/http" "os" + "sync" "time" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" @@ -52,6 +54,11 @@ type FilerServer struct { secret security.SigningKey filer *filer2.Filer grpcDialOption grpc.DialOption + + // notifying clients + clientChansLock sync.RWMutex + clientChans map[string]chan *filer_pb.FullEventNotification + } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { |
