aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-02-11 01:05:03 -0800
committerGitHub <noreply@github.com>2021-02-11 01:05:03 -0800
commit0f426ce34d63deb6c8d4695c43a9e7c504e25005 (patch)
treed0471963e986edb21f7a2e8ded89e9558b442c46
parent885ca34748926b5ffb6fe6ef4cfc5e225bdaa98c (diff)
parent94eac4f00eb5fc364614b53718a4243141572215 (diff)
downloadseaweedfs-0f426ce34d63deb6c8d4695c43a9e7c504e25005.tar.xz
seaweedfs-0f426ce34d63deb6c8d4695c43a9e7c504e25005.zip
Merge pull request #1801 from kmlebedev/recoveringRabbitMQ
Do reconnect to RabbitMQ
-rw-r--r--weed/filer/filer_notify.go5
-rw-r--r--weed/notification/gocdk_pub_sub/gocdk_pub_sub.go35
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go33
3 files changed, 62 insertions, 11 deletions
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index f3a795ad0..c461a82b8 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -55,7 +55,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
if notification.Queue != nil {
glog.V(3).Infof("notifying entry update %v", fullpath)
- notification.Queue.SendMessage(fullpath, eventNotification)
+ if err := notification.Queue.SendMessage(fullpath, eventNotification); err != nil {
+ // throw message
+ glog.Error(err)
+ }
}
f.logMetaEvent(ctx, fullpath, eventNotification)
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
index 1ae102509..01c4d901f 100644
--- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -17,10 +17,14 @@ package gocdk_pub_sub
import (
"context"
"fmt"
-
"github.com/golang/protobuf/proto"
+ "github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
+ "gocloud.dev/pubsub/rabbitpubsub"
+ "net/url"
+ "path"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
@@ -29,12 +33,18 @@ import (
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
_ "gocloud.dev/pubsub/rabbitpubsub"
+ "os"
)
func init() {
notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
}
+func getPath(rawUrl string) string {
+ parsedUrl, _ := url.Parse(rawUrl)
+ return path.Join(parsedUrl.Host, parsedUrl.Path)
+}
+
type GoCDKPubSub struct {
topicURL string
topic *pubsub.Topic
@@ -44,6 +54,28 @@ func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
+func (k *GoCDKPubSub) doReconnect() {
+ var conn *amqp.Connection
+ if k.topic.As(&conn) {
+ go func() {
+ <-conn.NotifyClose(make(chan *amqp.Error))
+ conn.Close()
+ k.topic.Shutdown(context.Background())
+ for {
+ glog.Info("Try reconnect")
+ conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
+ if err == nil {
+ k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
+ k.doReconnect()
+ break
+ }
+ glog.Error(err)
+ time.Sleep(time.Second)
+ }
+ }()
+ }
+}
+
func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
k.topicURL = configuration.GetString(prefix + "topic_url")
glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
@@ -52,6 +84,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
glog.Fatalf("Failed to open topic: %v", err)
}
k.topic = topic
+ k.doReconnect()
return nil
}
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 1d7fe520b..b16eec2e1 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -9,9 +9,12 @@ import (
"github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
+ "gocloud.dev/pubsub/rabbitpubsub"
"net/url"
+ "os"
"path"
"strings"
+ "time"
// _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
@@ -73,7 +76,8 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str
}
type GoCDKPubSubInput struct {
- sub *pubsub.Subscription
+ sub *pubsub.Subscription
+ subURL string
}
func (k *GoCDKPubSubInput) GetName() string {
@@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string {
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
topicUrl := configuration.GetString(prefix + "topic_url")
- subURL := configuration.GetString(prefix + "sub_url")
- glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
- sub, err := pubsub.OpenSubscription(context.Background(), subURL)
+ k.subURL = configuration.GetString(prefix + "sub_url")
+ glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
+ sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
if err != nil {
return err
}
@@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
return err
}
defer ch.Close()
- _, err = ch.QueueInspect(getPath(subURL))
+ _, err = ch.QueueInspect(getPath(k.subURL))
if err != nil {
if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
- if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
+ if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil {
return err
}
} else {
@@ -111,13 +115,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
}
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
- msg, err := k.sub.Receive(context.Background())
+ ctx := context.Background()
+ msg, err := k.sub.Receive(ctx)
if err != nil {
var conn *amqp.Connection
if k.sub.As(&conn) && conn.IsClosed() {
- glog.Fatalln(err)
+ conn.Close()
+ k.sub.Shutdown(ctx)
+ conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
+ if err != nil {
+ glog.Error(err)
+ time.Sleep(time.Second)
+ return
+ }
+ k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil)
+ return
}
- return
+ // This is permanent cached sub err
+ glog.Fatal(err)
}
onFailureFn = func() {
if msg.Nackable() {