aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/meta_aggregator.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/meta_aggregator.go')
-rw-r--r--weed/filer/meta_aggregator.go76
1 files changed, 55 insertions, 21 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 18049ee04..7a5329d74 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"io"
"sync"
"time"
@@ -64,31 +65,33 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
MaxChangeLimit := 100
- if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
- if self != filer {
+ isSameFilerStore, err := ma.isSameFilerStore(f, filer)
+ for err != nil {
+ glog.V(0).Infof("connecting to peer filer %s: %v", filer, err)
+ time.Sleep(1357 * time.Millisecond)
+ isSameFilerStore, err = ma.isSameFilerStore(f, filer)
+ }
- if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
- lastTsNs = prevTsNs
- }
+ if !isSameFilerStore{
+ if prevTsNs, err := ma.readOffset(f, filer); err == nil {
+ lastTsNs = prevTsNs
+ }
- glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
- maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
- if err := Replay(f.Store.ActualStore, event); err != nil {
- glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
- return
- }
- changesSinceLastPersist++
- if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
- if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
- lastPersistTime = time.Now()
- changesSinceLastPersist = 0
- } else {
- glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
- }
+ glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
+ maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
+ if err := Replay(f.Store.ActualStore, event); err != nil {
+ glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
+ return
+ }
+ changesSinceLastPersist++
+ if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
+ if err := ma.updateOffset(f, filer, event.TsNs); err == nil {
+ lastPersistTime = time.Now()
+ changesSinceLastPersist = 0
+ } else {
+ glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
}
}
- } else {
- glog.V(0).Infof("skipping following self: %v", self)
}
}
@@ -151,3 +154,34 @@ func (ma *MetaAggregator) isSameFilerStore(f *Filer, peer string) (isSame bool,
})
return
}
+
+func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err error) {
+
+ value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer))
+
+ if err != nil {
+ return 0, fmt.Errorf("readOffset %s : %v", peer, err)
+ }
+
+ lastTsNs = int64(util.BytesToUint64(value))
+
+ glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
+
+ return
+}
+
+func (ma *MetaAggregator) updateOffset(f *Filer, peer string, lastTsNs int64) (err error) {
+
+ value := make([]byte, 8)
+ util.Uint64toBytes(value, uint64(lastTsNs))
+
+ err = f.Store.KvPut(context.Background(), []byte("meta"+peer), value)
+
+ if err != nil {
+ return fmt.Errorf("updateOffset %s : %v", peer, err)
+ }
+
+ glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
+
+ return
+}