aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-30 01:19:33 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-30 01:19:33 -0700
commit50a5018b7fb259cee84471ce8d7bb6e554602c61 (patch)
tree9c90e316fe1b7b06e3c0319a8723b85b84518a95 /weed/server
parent9dc0b1df8f3bb19ce01b2d520436dbdc0f2a883e (diff)
downloadseaweedfs-50a5018b7fb259cee84471ce8d7bb6e554602c61.tar.xz
seaweedfs-50a5018b7fb259cee84471ce8d7bb6e554602c61.zip
writing meta logs is working
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_listen.go70
-rw-r--r--weed/server/filer_server.go7
2 files changed, 77 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
new file mode 100644
index 000000000..37d643ce2
--- /dev/null
+++ b/weed/server/filer_grpc_server_listen.go
@@ -0,0 +1,70 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fs *FilerServer) ListenForEvents(req *filer_pb.ListenForEventsRequest, stream filer_pb.SeaweedFiler_ListenForEventsServer) error {
+
+ peerAddress := findClientAddress(stream.Context(), 0)
+
+ clientName, messageChan := fs.addClient(req.ClientName, peerAddress)
+
+ defer fs.deleteClient(clientName, messageChan)
+
+ // ts := time.Unix(req.SinceSec, 0)
+
+ // iterate through old messages
+ /*
+ for _, message := range ms.Topo.ToVolumeLocations() {
+ if err := stream.Send(message); err != nil {
+ return err
+ }
+ }
+ */
+
+ // need to add a buffer here to avoid slow clients
+ // also needs to support millions of clients
+
+ for message := range messageChan {
+ if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("=> client %v: %+v", clientName, message)
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *filer_pb.FullEventNotification) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ listener %v", clientName)
+
+ messageChan = make(chan *filer_pb.FullEventNotification, 10)
+
+ fs.clientChansLock.Lock()
+ fs.clientChans[clientName] = messageChan
+ fs.clientChansLock.Unlock()
+ return
+}
+
+func (fs *FilerServer) deleteClient(clientName string, messageChan chan *filer_pb.FullEventNotification) {
+ glog.V(0).Infof("- listener %v", clientName)
+ close(messageChan)
+ fs.clientChansLock.Lock()
+ delete(fs.clientChans, clientName)
+ fs.clientChansLock.Unlock()
+}
+
+func (fs *FilerServer) sendMessageToClients(dir string, eventNotification *filer_pb.EventNotification) {
+ message := &filer_pb.FullEventNotification{
+ Directory: dir,
+ EventNotification: eventNotification,
+ }
+ fs.clientChansLock.RLock()
+ for _, ch := range fs.clientChans {
+ ch <- message
+ }
+ fs.clientChansLock.RUnlock()
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index c3b959c7c..9b688a08d 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -5,11 +5,13 @@ import (
"fmt"
"net/http"
"os"
+ "sync"
"time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -52,6 +54,11 @@ type FilerServer struct {
secret security.SigningKey
filer *filer2.Filer
grpcDialOption grpc.DialOption
+
+ // notifying clients
+ clientChansLock sync.RWMutex
+ clientChans map[string]chan *filer_pb.FullEventNotification
+
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {