aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-05 23:25:03 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-05 23:25:03 -0700
commit8acd7146006745a16079b1347e8f772b62762332 (patch)
tree5763c41cc2246382fa6084f97056bae8bf2490a1
parentd741ed66db44c2058e30c260e2a75eeec7308bd1 (diff)
downloadseaweedfs-8acd7146006745a16079b1347e8f772b62762332.tar.xz
seaweedfs-8acd7146006745a16079b1347e8f772b62762332.zip
apply meta changes only if store is different
-rw-r--r--weed/filer/filerstore.go5
-rw-r--r--weed/filer/leveldb2/leveldb2_local_store.go43
-rw-r--r--weed/filer/meta_aggregator.go76
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go20
4 files changed, 64 insertions, 80 deletions
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 518212437..d313b7ba3 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -42,11 +42,6 @@ type FilerStore interface {
Shutdown()
}
-type FilerLocalStore interface {
- UpdateOffset(filer string, lastTsNs int64) error
- ReadOffset(filer string) (lastTsNs int64, err error)
-}
-
type FilerStoreWrapper struct {
ActualStore FilerStore
}
diff --git a/weed/filer/leveldb2/leveldb2_local_store.go b/weed/filer/leveldb2/leveldb2_local_store.go
deleted file mode 100644
index faae25c45..000000000
--- a/weed/filer/leveldb2/leveldb2_local_store.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package leveldb
-
-import (
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- _ = filer.FilerLocalStore(&LevelDB2Store{})
-)
-
-func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error {
-
- value := make([]byte, 8)
- util.Uint64toBytes(value, uint64(lastTsNs))
-
- err := store.dbs[0].Put([]byte("meta"+filer), value, nil)
-
- if err != nil {
- return fmt.Errorf("UpdateOffset %s : %v", filer, err)
- }
-
- println("UpdateOffset", filer, "lastTsNs", lastTsNs)
-
- return nil
-}
-
-func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) {
-
- value, err := store.dbs[0].Get([]byte("meta"+filer), nil)
-
- if err != nil {
- return 0, fmt.Errorf("ReadOffset %s : %v", filer, err)
- }
-
- lastTsNs = int64(util.BytesToUint64(value))
-
- println("ReadOffset", filer, "lastTsNs", lastTsNs)
-
- return
-}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 18049ee04..7a5329d74 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"io"
"sync"
"time"
@@ -64,31 +65,33 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
MaxChangeLimit := 100
- if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
- if self != filer {
+ isSameFilerStore, err := ma.isSameFilerStore(f, filer)
+ for err != nil {
+ glog.V(0).Infof("connecting to peer filer %s: %v", filer, err)
+ time.Sleep(1357 * time.Millisecond)
+ isSameFilerStore, err = ma.isSameFilerStore(f, filer)
+ }
- if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
- lastTsNs = prevTsNs
- }
+ if !isSameFilerStore{
+ if prevTsNs, err := ma.readOffset(f, filer); err == nil {
+ lastTsNs = prevTsNs
+ }
- glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
- maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
- if err := Replay(f.Store.ActualStore, event); err != nil {
- glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
- return
- }
- changesSinceLastPersist++
- if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
- if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
- lastPersistTime = time.Now()
- changesSinceLastPersist = 0
- } else {
- glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
- }
+ glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
+ maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
+ if err := Replay(f.Store.ActualStore, event); err != nil {
+ glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
+ return
+ }
+ changesSinceLastPersist++
+ if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
+ if err := ma.updateOffset(f, filer, event.TsNs); err == nil {
+ lastPersistTime = time.Now()
+ changesSinceLastPersist = 0
+ } else {
+ glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
}
}
- } else {
- glog.V(0).Infof("skipping following self: %v", self)
}
}
@@ -151,3 +154,34 @@ func (ma *MetaAggregator) isSameFilerStore(f *Filer, peer string) (isSame bool,
})
return
}
+
+func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err error) {
+
+ value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer))
+
+ if err != nil {
+ return 0, fmt.Errorf("readOffset %s : %v", peer, err)
+ }
+
+ lastTsNs = int64(util.BytesToUint64(value))
+
+ glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
+
+ return
+}
+
+func (ma *MetaAggregator) updateOffset(f *Filer, peer string, lastTsNs int64) (err error) {
+
+ value := make([]byte, 8)
+ util.Uint64toBytes(value, uint64(lastTsNs))
+
+ err = f.Store.KvPut(context.Background(), []byte("meta"+peer), value)
+
+ if err != nil {
+ return fmt.Errorf("updateOffset %s : %v", peer, err)
+ }
+
+ glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
+
+ return
+}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 9ba45edfe..72e2b355b 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -63,21 +63,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
- if _, ok := fs.filer.Store.ActualStore.(filer.FilerLocalStore); ok {
- // println("reading from persisted logs ...")
- processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
+ // println("reading from persisted logs ...")
+ processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
- glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
}
+ glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
// println("reading from in memory logs ...")
- err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()