aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-07 00:56:15 -0700
committerchrislu <chris.lu@gmail.com>2022-08-07 00:56:15 -0700
commit1a4bf0dcb5852c62070fe9627b7e63c5d55bb460 (patch)
tree33029c976e849f6e5a29e2eb7cd5d905a77307a8
parent02b38f894b97be945e7be69a0f897d24f726a957 (diff)
downloadseaweedfs-1a4bf0dcb5852c62070fe9627b7e63c5d55bb460.tar.xz
seaweedfs-1a4bf0dcb5852c62070fe9627b7e63c5d55bb460.zip
filer.sync: parallelize the filer.sync
-rw-r--r--weed/command/filer_sync.go13
-rw-r--r--weed/command/filer_sync_jobs.go148
-rw-r--r--weed/util/fullpath.go8
3 files changed, 165 insertions, 4 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 9d5b9d831..8eb68c098 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -251,16 +251,21 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
return persistEventFn(resp)
}
+ processor := NewMetadataProcessor(processEventFn, 128)
var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
- processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
+ processor.AddSyncJob(resp)
+ return nil
+ }, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano()
- glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
// collect synchronous offset
- statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs))
- return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
+ statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(processor.processedTsWatermark))
+ return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark)
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch,
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go
new file mode 100644
index 000000000..0d4d83adb
--- /dev/null
+++ b/weed/command/filer_sync_jobs.go
@@ -0,0 +1,148 @@
+package command
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "sync"
+)
+
+type MetadataProcessFunc func(resp *filer_pb.SubscribeMetadataResponse) error
+
+type MetadataProcessor struct {
+ activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
+ activeJobsLock sync.Mutex
+ activeJobsCond *sync.Cond
+ concurrencyLimit int
+ fn MetadataProcessFunc
+ processedTsWatermark int64
+}
+
+func NewMetadataProcessor(fn MetadataProcessFunc, concurrency int) *MetadataProcessor {
+ t := &MetadataProcessor{
+ fn: fn,
+ activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
+ concurrencyLimit: concurrency,
+ }
+ t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
+ return t
+}
+
+func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
+ if filer_pb.IsEmpty(resp) {
+ return
+ }
+
+ t.activeJobsLock.Lock()
+ defer t.activeJobsLock.Unlock()
+
+ for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
+ t.activeJobsCond.Wait()
+ }
+ t.activeJobs[resp.TsNs] = resp
+ go func() {
+
+ util.RetryForever("metadata processor", func() error {
+ return t.fn(resp)
+ }, func(err error) bool {
+ glog.Errorf("process %v: %v", resp, err)
+ return true
+ })
+
+ t.activeJobsLock.Lock()
+ defer t.activeJobsLock.Unlock()
+
+ delete(t.activeJobs, resp.TsNs)
+
+ // if is the oldest job, write down the watermark
+ isOldest := true
+ for t, _ := range t.activeJobs {
+ if resp.TsNs > t {
+ isOldest = false
+ break
+ }
+ }
+ if isOldest {
+ t.processedTsWatermark = resp.TsNs
+ }
+ t.activeJobsCond.Signal()
+ }()
+}
+
+func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
+ for _, r := range t.activeJobs {
+ if shouldWaitFor(resp, r) {
+ return true
+ }
+ }
+ return false
+}
+
+// a is one possible job to schedule
+// b is one existing active job
+func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
+ aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
+ bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
+
+ if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ if aNewPath != "" {
+ if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ }
+ if bNewPath != "" {
+ if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ }
+ if aNewPath != "" && bNewPath != "" {
+ if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
+ return true
+ }
+ }
+ return false
+}
+
+func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
+ if bIsDirectory {
+ if aIsDirectory {
+ return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
+ } else {
+ return aPath.IsUnder(bPath)
+ }
+ } else {
+ if aIsDirectory {
+ return bPath.IsUnder(aPath)
+ } else {
+ return aPath == bPath
+ }
+ }
+}
+
+func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
+ oldEntry := resp.EventNotification.OldEntry
+ newEntry := resp.EventNotification.NewEntry
+ // create
+ if filer_pb.IsCreate(resp) {
+ path = util.FullPath(resp.Directory).Child(newEntry.Name)
+ isDirectory = newEntry.IsDirectory
+ return
+ }
+ if filer_pb.IsDelete(resp) {
+ path = util.FullPath(resp.Directory).Child(oldEntry.Name)
+ isDirectory = oldEntry.IsDirectory
+ return
+ }
+ if filer_pb.IsUpdate(resp) {
+ path = util.FullPath(resp.Directory).Child(newEntry.Name)
+ isDirectory = newEntry.IsDirectory
+ return
+ }
+ // renaming
+ path = util.FullPath(resp.Directory).Child(oldEntry.Name)
+ isDirectory = oldEntry.IsDirectory
+ newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
+ return
+}
diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go
index 92580dc38..6ac2af14f 100644
--- a/weed/util/fullpath.go
+++ b/weed/util/fullpath.go
@@ -63,6 +63,14 @@ func Join(names ...string) string {
func JoinPath(names ...string) FullPath {
return FullPath(Join(names...))
}
+
+func (fp FullPath) IsUnder(other FullPath) bool {
+ if other == "/" {
+ return true
+ }
+ return strings.HasPrefix(string(fp), string(other)+"/")
+}
+
func Split(separatedValues string, sep string) []string {
if separatedValues == "" {
return nil