diff options
Diffstat (limited to 'weed/filesys/meta_cache/meta_cache_subscribe.go')
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 41 |
1 files changed, 4 insertions, 37 deletions
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index 747ac3cb9..c650b8024 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -2,12 +2,9 @@ package meta_cache import ( "context" - "fmt" - "io" - "time" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -62,38 +59,8 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } - for { - err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "mount", - PathPrefix: dir, - SinceNs: lastTsNs, - Signature: selfSignature, - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } + return util.Retry("followMetaUpdates", func() error { + return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true) + }) - if err := processEventFn(resp); err != nil { - glog.Fatalf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs - } - }) - if err != nil { - glog.Errorf("subscribing filer meta change: %v", err) - } - time.Sleep(time.Second) - } } |
