aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-09-05 19:57:44 +0500
committerGitHub <noreply@github.com>2022-09-05 07:57:44 -0700
commit9b2b7d4f5a187cb7d9f7c4833a5333847db35e8a (patch)
tree11af2832875fb47ba1a8cabae4a16bbc3e3d0930
parent916673ae24861fc7ad169dd3f580962d1a65b39a (diff)
downloadseaweedfs-9b2b7d4f5a187cb7d9f7c4833a5333847db35e8a.tar.xz
seaweedfs-9b2b7d4f5a187cb7d9f7c4833a5333847db35e8a.zip
avoid data race on GoCDKPubSub.topic (#3596)
-rw-r--r--weed/notification/gocdk_pub_sub/gocdk_pub_sub.go33
1 files changed, 23 insertions, 10 deletions
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
index fcd8e61aa..131345f9c 100644
--- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/proto"
"net/url"
"path"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -49,33 +50,44 @@ func getPath(rawUrl string) string {
}
type GoCDKPubSub struct {
- topicURL string
- topic *pubsub.Topic
+ topicURL string
+ topic *pubsub.Topic
+ topicLock sync.RWMutex
}
func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
+func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) {
+ k.topicLock.Lock()
+ k.topic = topic
+ k.topicLock.Unlock()
+ k.doReconnect()
+}
+
func (k *GoCDKPubSub) doReconnect() {
var conn *amqp.Connection
+ k.topicLock.RLock()
+ defer k.topicLock.RUnlock()
if k.topic.As(&conn) {
- go func() {
- <-conn.NotifyClose(make(chan *amqp.Error))
- conn.Close()
+ go func(c *amqp.Connection) {
+ <-c.NotifyClose(make(chan *amqp.Error))
+ c.Close()
+ k.topicLock.RLock()
k.topic.Shutdown(context.Background())
+ k.topicLock.RUnlock()
for {
glog.Info("Try reconnect")
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err == nil {
- k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
- k.doReconnect()
+ k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil))
break
}
glog.Error(err)
time.Sleep(time.Second)
}
- }()
+ }(conn)
}
}
@@ -86,8 +98,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
if err != nil {
glog.Fatalf("Failed to open topic: %v", err)
}
- k.topic = topic
- k.doReconnect()
+ k.setTopic(topic)
return nil
}
@@ -96,6 +107,8 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
if err != nil {
return err
}
+ k.topicLock.RLock()
+ defer k.topicLock.RUnlock()
err = k.topic.Send(context.Background(), &pubsub.Message{
Body: bytes,
Metadata: map[string]string{"key": key},