aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer2/filer.go2
-rw-r--r--weed/filer2/filerstore.go5
-rw-r--r--weed/filer2/leveldb2/leveldb2_local_store.go47
-rw-r--r--weed/filer2/meta_aggregator.go38
-rw-r--r--weed/filer2/meta_replay.go37
5 files changed, 125 insertions, 4 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 7747c9af6..8c2b1b33a 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -68,7 +68,7 @@ func (f *Filer) AggregateFromPeers(self string, filers []string) {
filers = append(filers, self)
}
f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
- f.MetaAggregator.StartLoopSubscribe(time.Now().UnixNano())
+ f.MetaAggregator.StartLoopSubscribe(f, self, time.Now().UnixNano())
}
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index f36c74f14..24464b6a5 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -29,6 +29,11 @@ type FilerStore interface {
Shutdown()
}
+type FilerLocalStore interface {
+ UpdateOffset(filer string, lastTsNs int64) error
+ ReadOffset(filer string) (lastTsNs int64, err error)
+}
+
type FilerStoreWrapper struct {
actualStore FilerStore
}
diff --git a/weed/filer2/leveldb2/leveldb2_local_store.go b/weed/filer2/leveldb2/leveldb2_local_store.go
new file mode 100644
index 000000000..86aa54471
--- /dev/null
+++ b/weed/filer2/leveldb2/leveldb2_local_store.go
@@ -0,0 +1,47 @@
+package leveldb
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &LevelDB2Store{})
+}
+
+var (
+ _ = filer2.FilerLocalStore(&LevelDB2Store{})
+)
+
+func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error {
+
+ value := make([]byte, 8)
+ util.Uint64toBytes(value, uint64(lastTsNs))
+
+ err := store.dbs[0].Put([]byte("meta"+filer), value, nil)
+
+ if err != nil {
+ return fmt.Errorf("UpdateOffset %s : %v", filer, err)
+ }
+
+ println("UpdateOffset", filer, "lastTsNs", lastTsNs)
+
+ return nil
+}
+
+func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) {
+
+ value, err := store.dbs[0].Get([]byte("meta"+filer), nil)
+
+ if err != nil {
+ return 0, fmt.Errorf("ReadOffset %s : %v", filer, err)
+ }
+
+ lastTsNs = int64(util.BytesToUint64(value))
+
+ println("ReadOffset", filer, "lastTsNs", lastTsNs)
+
+ return
+}
diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go
index 2f707b921..478337ae7 100644
--- a/weed/filer2/meta_aggregator.go
+++ b/weed/filer2/meta_aggregator.go
@@ -37,13 +37,42 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg
return t
}
-func (ma *MetaAggregator) StartLoopSubscribe(lastTsNs int64) {
+func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string, lastTsNs int64) {
for _, filer := range ma.filers {
- go ma.subscribeToOneFiler(filer, lastTsNs)
+ go ma.subscribeToOneFiler(f, self, filer, lastTsNs)
}
}
-func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) {
+func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, filer string, self string, lastTsNs int64) {
+
+ var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
+ lastPersistTime := time.Now()
+ changesSinceLastPersist := 0
+
+ MaxChangeLimit := 100
+
+ if localStore, ok := f.store.actualStore.(FilerLocalStore); ok {
+ if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
+ lastTsNs = prevTsNs
+ }
+ if self != filer {
+ maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
+ if err := Replay(f.store.actualStore, event); err != nil {
+ glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
+ return
+ }
+ changesSinceLastPersist++
+ if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
+ if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
+ lastPersistTime = time.Now()
+ changesSinceLastPersist = 0
+ } else {
+ glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
+ }
+ }
+ }
+ }
+ }
processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
data, err := proto.Marshal(event)
@@ -54,6 +83,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) {
dir := event.Directory
// println("received meta change", dir, "size", len(data))
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data)
+ if maybeReplicateMetadataChange != nil {
+ maybeReplicateMetadataChange(event)
+ }
return nil
}
diff --git a/weed/filer2/meta_replay.go b/weed/filer2/meta_replay.go
new file mode 100644
index 000000000..d9cdaa76a
--- /dev/null
+++ b/weed/filer2/meta_replay.go
@@ -0,0 +1,37 @@
+package filer2
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ var oldPath util.FullPath
+ var newEntry *Entry
+ if message.OldEntry != nil {
+ oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", oldPath)
+ if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil {
+ return err
+ }
+ }
+
+ if message.NewEntry != nil {
+ dir := resp.Directory
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ newEntry = FromPbEntry(dir, message.NewEntry)
+ if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}