diff options
Diffstat (limited to 'weed/s3api/auth_credentials_subscribe.go')
| -rw-r--r-- | weed/s3api/auth_credentials_subscribe.go | 41 |
1 files changed, 5 insertions, 36 deletions
diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index ea4b69550..05cce632a 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -1,13 +1,11 @@ package s3api import ( - "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "time" + "github.com/chrislusf/seaweedfs/weed/util" ) func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error { @@ -34,37 +32,8 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la return nil } - for { - err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: clientName, - PathPrefix: prefix, - SinceNs: lastTsNs, - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } + return util.Retry("followIamChanges", func() error { + return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, lastTsNs, 0, processEventFn, true) + }) - if err := processEventFn(resp); err != nil { - glog.Fatalf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs - } - }) - if err != nil { - glog.Errorf("subscribing filer meta change: %v", err) - } - time.Sleep(time.Second) - } } |
