aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/meta_aggregator.go92
1 files changed, 53 insertions, 39 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 83c8a945d..863f5c3e9 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -57,7 +57,7 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
if update.IsAdd {
// every filer should subscribe to a new filer
if ma.setActive(address, true) {
- go ma.subscribeToOneFiler(ma.filer, ma.self, address)
+ go ma.loopSubscribeToOnefiler(ma.filer, ma.self, address)
}
} else {
ma.setActive(address, false)
@@ -89,7 +89,21 @@ func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
return count > 0 && isActive
}
-func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {
+func (ma *MetaAggregator) loopSubscribeToOnefiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {
+ for {
+ err := ma.doSubscribeToOneFiler(f, self, peer)
+ if !ma.isActive(peer) {
+ glog.V(0).Infof("stop subscribing remote %s meta change", peer)
+ return
+ }
+ if err != nil {
+ glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
+ }
+ time.Sleep(1733 * time.Millisecond)
+ }
+}
+
+func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) error {
/*
Each filer reads the "filer.store.id", which is the store's signature when filer starts.
@@ -117,6 +131,15 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
lastTsNs = 0
if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
lastTsNs = prevTsNs
+ defer func(prevTsNs int64) {
+ if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() {
+ if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil {
+ glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ } else {
+ glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ }
+ }
+ }(prevTsNs)
}
glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
@@ -160,48 +183,39 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
return nil
}
- for {
- glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
- err := pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "filer:" + string(self),
- PathPrefix: "/",
- SinceNs: lastTsNs,
- ClientId: int32(ma.filer.UniqueFileId),
- })
- if err != nil {
- return fmt.Errorf("subscribe: %v", err)
- }
+ glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
+ err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "filer:" + string(self),
+ PathPrefix: "/",
+ SinceNs: lastTsNs,
+ ClientId: int32(ma.filer.UniqueFileId),
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("process %v: %v", resp, err)
- }
- lastTsNs = resp.TsNs
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
- f.onMetadataChangeEvent(resp)
+ f.onMetadataChangeEvent(resp)
- }
- })
- if !ma.isActive(peer) {
- glog.V(0).Infof("stop subscribing remote %s meta change", peer)
- return
- }
- if err != nil {
- glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
- time.Sleep(1733 * time.Millisecond)
}
- }
+ })
+ return err
}
func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {