aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-06 00:12:41 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-06 00:12:41 -0700
commitc9f8f25ba5158ab02ad24f8375d6017d08069ecb (patch)
treeeea19964f64db06d8afbe76ef9cd040e61642623
parent30dc365cbdb44797c3b360ad3793a03fdd70ffff (diff)
downloadseaweedfs-c9f8f25ba5158ab02ad24f8375d6017d08069ecb.tar.xz
seaweedfs-c9f8f25ba5158ab02ad24f8375d6017d08069ecb.zip
read peer filer from start
-rw-r--r--weed/filer/meta_aggregator.go17
1 files changed, 10 insertions, 7 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 7a5329d74..367c0d150 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -60,11 +60,8 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now()
- changesSinceLastPersist := 0
lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
- MaxChangeLimit := 100
-
isSameFilerStore, err := ma.isSameFilerStore(f, filer)
for err != nil {
glog.V(0).Infof("connecting to peer filer %s: %v", filer, err)
@@ -72,7 +69,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
isSameFilerStore, err = ma.isSameFilerStore(f, filer)
}
- if !isSameFilerStore{
+ if !isSameFilerStore {
if prevTsNs, err := ma.readOffset(f, filer); err == nil {
lastTsNs = prevTsNs
}
@@ -83,11 +80,12 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
return
}
- changesSinceLastPersist++
- if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
+ if lastPersistTime.Add(time.Minute).Before(time.Now()) {
if err := ma.updateOffset(f, filer, event.TsNs); err == nil {
+ if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
+ glog.V(0).Infof("sync with %s progressed to: %v", filer, time.Unix(0, event.TsNs).UTC())
+ }
lastPersistTime = time.Now()
- changesSinceLastPersist = 0
} else {
glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
}
@@ -159,6 +157,11 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err
value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer))
+ if err == ErrKvNotFound {
+ glog.Warningf("readOffset %s not found", peer)
+ return 0, nil
+ }
+
if err != nil {
return 0, fmt.Errorf("readOffset %s : %v", peer, err)
}