diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-09-09 11:21:23 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-09-09 11:21:23 -0700 |
| commit | 387ab6796f274151f802ccdab8756b959b5fb1cb (patch) | |
| tree | a3b95f5bdba66f12c609b5e53b262b011a47a450 /weed/server | |
| parent | 4fc0bd1a8173e284ff919edb5214f5adf7a90f06 (diff) | |
| download | seaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.tar.xz seaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.zip | |
filer: cross cluster synchronization
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_kv.go | 42 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 56 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 2 |
4 files changed, 85 insertions, 17 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 260823c49..20c2502b9 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -314,7 +314,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, nil) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) resp = &filer_pb.DeleteEntryResponse{} if err != nil { resp.Error = err.Error() diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go new file mode 100644 index 000000000..3cb47115e --- /dev/null +++ b/weed/server/filer_grpc_server_kv.go @@ -0,0 +1,42 @@ +package weed_server + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) { + + value, err := fs.filer.Store.KvGet(ctx, req.Key) + if err == filer.ErrKvNotFound { + return &filer_pb.KvGetResponse{}, nil + } + + if err != nil { + return &filer_pb.KvGetResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvGetResponse{ + Value: value, + }, nil + +} + +// KvPut sets the key~value. if empty value, delete the kv entry +func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) { + + if len(req.Value) == 0 { + if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + } + + err := fs.filer.Store.KvPut(ctx, req.Key, req.Value) + if err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvPutResponse{}, nil + +} diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 72e2b355b..634fb5211 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "strings" "time" @@ -24,7 +25,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -37,12 +38,21 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, processedTsNs) } - err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.filer.MetaAggregator.ListenersLock.Lock() - fs.filer.MetaAggregator.ListenersCond.Wait() - fs.filer.MetaAggregator.ListenersLock.Unlock() - return true - }, eachLogEntryFn) + for { + lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.filer.MetaAggregator.ListenersLock.Lock() + fs.filer.MetaAggregator.ListenersCond.Wait() + fs.filer.MetaAggregator.ListenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } return err @@ -59,7 +69,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -75,12 +85,21 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) // println("reading from in memory logs ...") - err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, eachLogEntryFn) + for { + lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } return err @@ -102,13 +121,20 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + foundSelf := false for _, sig := range eventNotification.Signatures { if sig == clientSignature && clientSignature != 0 { return nil } + if sig == fs.filer.Signature { + foundSelf = true + } + } + if !foundSelf { + eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature) } // get complete path to the file or directory diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 57723ab0b..93c1f41d2 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) dir, name := util.FullPath(fullFilePath).DirAndName() - return filer_pb.Remove(fs, dir, name, true, false, false, false, fs.signature) + return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature}) } |
