aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync_jobs.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_sync_jobs.go')
-rw-r--r--weed/command/filer_sync_jobs.go148
1 files changed, 148 insertions, 0 deletions
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go
new file mode 100644
index 000000000..0d4d83adb
--- /dev/null
+++ b/weed/command/filer_sync_jobs.go
@@ -0,0 +1,148 @@
+package command
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "sync"
+)
+
+type MetadataProcessFunc func(resp *filer_pb.SubscribeMetadataResponse) error
+
+type MetadataProcessor struct {
+ activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
+ activeJobsLock sync.Mutex
+ activeJobsCond *sync.Cond
+ concurrencyLimit int
+ fn MetadataProcessFunc
+ processedTsWatermark int64
+}
+
+func NewMetadataProcessor(fn MetadataProcessFunc, concurrency int) *MetadataProcessor {
+ t := &MetadataProcessor{
+ fn: fn,
+ activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
+ concurrencyLimit: concurrency,
+ }
+ t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
+ return t
+}
+
+func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
+ if filer_pb.IsEmpty(resp) {
+ return
+ }
+
+ t.activeJobsLock.Lock()
+ defer t.activeJobsLock.Unlock()
+
+ for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
+ t.activeJobsCond.Wait()
+ }
+ t.activeJobs[resp.TsNs] = resp
+ go func() {
+
+ util.RetryForever("metadata processor", func() error {
+ return t.fn(resp)
+ }, func(err error) bool {
+ glog.Errorf("process %v: %v", resp, err)
+ return true
+ })
+
+ t.activeJobsLock.Lock()
+ defer t.activeJobsLock.Unlock()
+
+ delete(t.activeJobs, resp.TsNs)
+
+ // if is the oldest job, write down the watermark
+ isOldest := true
+ for t, _ := range t.activeJobs {
+ if resp.TsNs > t {
+ isOldest = false
+ break
+ }
+ }
+ if isOldest {
+ t.processedTsWatermark = resp.TsNs
+ }
+ t.activeJobsCond.Signal()
+ }()
+}
+
+func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
+ for _, r := range t.activeJobs {
+ if shouldWaitFor(resp, r) {
+ return true
+ }
+ }
+ return false
+}
+
+// a is one possible job to schedule
+// b is one existing active job
+func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
+ aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
+ bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
+
+ if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ if aNewPath != "" {
+ if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ }
+ if bNewPath != "" {
+ if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ }
+ if aNewPath != "" && bNewPath != "" {
+ if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ }
+ return false
+}
+
+func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
+ if bIsDirectory {
+ if aIsDirectory {
+ return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
+ } else {
+ return aPath.IsUnder(bPath)
+ }
+ } else {
+ if aIsDirectory {
+ return bPath.IsUnder(aPath)
+ } else {
+ return aPath == bPath
+ }
+ }
+}
+
+func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
+ oldEntry := resp.EventNotification.OldEntry
+ newEntry := resp.EventNotification.NewEntry
+ // create
+ if filer_pb.IsCreate(resp) {
+ path = util.FullPath(resp.Directory).Child(newEntry.Name)
+ isDirectory = newEntry.IsDirectory
+ return
+ }
+ if filer_pb.IsDelete(resp) {
+ path = util.FullPath(resp.Directory).Child(oldEntry.Name)
+ isDirectory = oldEntry.IsDirectory
+ return
+ }
+ if filer_pb.IsUpdate(resp) {
+ path = util.FullPath(resp.Directory).Child(newEntry.Name)
+ isDirectory = newEntry.IsDirectory
+ return
+ }
+ // renaming
+ path = util.FullPath(resp.Directory).Child(oldEntry.Name)
+ isDirectory = oldEntry.IsDirectory
+ newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
+ return
+}