diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2021-02-11 01:05:03 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-02-11 01:05:03 -0800 |
| commit | 0f426ce34d63deb6c8d4695c43a9e7c504e25005 (patch) | |
| tree | d0471963e986edb21f7a2e8ded89e9558b442c46 /weed/replication | |
| parent | 885ca34748926b5ffb6fe6ef4cfc5e225bdaa98c (diff) | |
| parent | 94eac4f00eb5fc364614b53718a4243141572215 (diff) | |
| download | seaweedfs-0f426ce34d63deb6c8d4695c43a9e7c504e25005.tar.xz seaweedfs-0f426ce34d63deb6c8d4695c43a9e7c504e25005.zip | |
Merge pull request #1801 from kmlebedev/recoveringRabbitMQ
Do reconnect to RabbitMQ
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 33 |
1 files changed, 24 insertions, 9 deletions
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 1d7fe520b..b16eec2e1 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -9,9 +9,12 @@ import ( "github.com/streadway/amqp" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" + "gocloud.dev/pubsub/rabbitpubsub" "net/url" + "os" "path" "strings" + "time" // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" @@ -73,7 +76,8 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str } type GoCDKPubSubInput struct { - sub *pubsub.Subscription + sub *pubsub.Subscription + subURL string } func (k *GoCDKPubSubInput) GetName() string { @@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string { func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { topicUrl := configuration.GetString(prefix + "topic_url") - subURL := configuration.GetString(prefix + "sub_url") - glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) - sub, err := pubsub.OpenSubscription(context.Background(), subURL) + k.subURL = configuration.GetString(prefix + "sub_url") + glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL) + sub, err := pubsub.OpenSubscription(context.Background(), k.subURL) if err != nil { return err } @@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s return err } defer ch.Close() - _, err = ch.QueueInspect(getPath(subURL)) + _, err = ch.QueueInspect(getPath(k.subURL)) if err != nil { if strings.HasPrefix(err.Error(), "Exception (404) Reason") { - if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil { + if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil { return err } } else { @@ -111,13 +115,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s } func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { - msg, err := k.sub.Receive(context.Background()) + ctx := context.Background() + msg, err := k.sub.Receive(ctx) if err != nil { var conn *amqp.Connection if k.sub.As(&conn) && conn.IsClosed() { - glog.Fatalln(err) + conn.Close() + k.sub.Shutdown(ctx) + conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) + if err != nil { + glog.Error(err) + time.Sleep(time.Second) + return + } + k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil) + return } - return + // This is permanent cached sub err + glog.Fatal(err) } onFailureFn = func() { if msg.Nackable() { |
