aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/notification/gocdk_pub_sub/gocdk_pub_sub.go33
1 files changed, 23 insertions, 10 deletions
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
index fcd8e61aa..131345f9c 100644
--- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/proto"
"net/url"
"path"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -49,33 +50,44 @@ func getPath(rawUrl string) string {
}
type GoCDKPubSub struct {
- topicURL string
- topic *pubsub.Topic
+ topicURL string
+ topic *pubsub.Topic
+ topicLock sync.RWMutex
}
func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
+func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) {
+ k.topicLock.Lock()
+ k.topic = topic
+ k.topicLock.Unlock()
+ k.doReconnect()
+}
+
func (k *GoCDKPubSub) doReconnect() {
var conn *amqp.Connection
+ k.topicLock.RLock()
+ defer k.topicLock.RUnlock()
if k.topic.As(&conn) {
- go func() {
- <-conn.NotifyClose(make(chan *amqp.Error))
- conn.Close()
+ go func(c *amqp.Connection) {
+ <-c.NotifyClose(make(chan *amqp.Error))
+ c.Close()
+ k.topicLock.RLock()
k.topic.Shutdown(context.Background())
+ k.topicLock.RUnlock()
for {
glog.Info("Try reconnect")
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err == nil {
- k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
- k.doReconnect()
+ k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil))
break
}
glog.Error(err)
time.Sleep(time.Second)
}
- }()
+ }(conn)
}
}
@@ -86,8 +98,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
if err != nil {
glog.Fatalf("Failed to open topic: %v", err)
}
- k.topic = topic
- k.doReconnect()
+ k.setTopic(topic)
return nil
}
@@ -96,6 +107,8 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
if err != nil {
return err
}
+ k.topicLock.RLock()
+ defer k.topicLock.RUnlock()
err = k.topic.Send(context.Background(), &pubsub.Message{
Body: bytes,
Metadata: map[string]string{"key": key},