diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_rename.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 2 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 11 |
7 files changed, 26 insertions, 15 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 5d066f9a0..405742e1e 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -171,7 +171,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr FullPath: util.JoinPath(req.Directory, req.Entry.Name), Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, - }, req.OExcl, req.IsFromOtherCluster) + }, req.OExcl, req.IsFromOtherCluster, req.Signatures) if createErr == nil { fs.filer.DeleteChunks(garbage) @@ -235,7 +235,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } - fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster) + fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures) return &filer_pb.UpdateEntryResponse{}, err } @@ -312,7 +312,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo glog.V(0).Infof("MaybeManifestize: %v", err) } - err = fs.filer.CreateEntry(context.Background(), entry, false, false) + err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil) return &filer_pb.AppendToEntryResponse{}, err } @@ -321,7 +321,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, nil) resp = &filer_pb.DeleteEntryResponse{} if err != nil { resp.Error = err.Error() diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 9642fec24..cbb71682c 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -110,7 +110,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat Attr: entry.Attr, Chunks: entry.Chunks, } - createErr := fs.filer.CreateEntry(ctx, newEntry, false, false) + createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil) if createErr != nil { return createErr } @@ -124,7 +124,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 4341f2091..2ad12e9c8 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -24,7 +24,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -59,7 +59,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -104,9 +104,15 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + for _, sig := range eventNotification.Signatures { + if sig == clientSignature && clientSignature != 0 { + return nil + } + } + // get complete path to the file or directory var entryName string if eventNotification.OldEntry != nil { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index d22376a45..0091ae3ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -111,7 +111,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { objectPath = objectPath[0 : len(objectPath)-1] } - err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil) if err != nil { glog.V(1).Infoln("deleting", objectPath, ":", err.Error()) httpStatus := http.StatusInternalServerError diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 0365ea3ab..266970618 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -172,7 +172,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Size: chunkOffset, } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 6ec06d3de..670399425 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -80,7 +80,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Size: int64(pu.OriginalDataSize), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) err = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 3d2629c19..f06189e34 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -70,6 +70,7 @@ type WebDavFileSystem struct { filer *filer2.Filer grpcDialOption grpc.DialOption chunkCache *chunk_cache.TieredChunkCache + signature int32 } type FileInfo struct { @@ -103,6 +104,7 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { return &WebDavFileSystem{ option: option, chunkCache: chunkCache, + signature: util.RandomInt32(), }, nil } @@ -165,6 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm Gid: fs.option.Gid, }, }, + Signatures: []int32{fs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -216,6 +219,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f TtlSec: 0, }, }, + Signatures: []int32{fs.signature}, }); err != nil { return fmt.Errorf("create %s: %v", fullFilePath, err) } @@ -255,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) dir, name := util.FullPath(fullFilePath).DirAndName() - return filer_pb.Remove(fs, dir, name, true, false, false, false) + return filer_pb.Remove(fs, dir, name, true, false, false, false, fs.signature) } @@ -422,8 +426,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Attributes.Replication = replication request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: f.entry, + Directory: dir, + Entry: f.entry, + Signatures: []int32{f.fs.signature}, } if _, err := client.UpdateEntry(ctx, request); err != nil { |
