aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/repl_util/replication_utli.go2
-rw-r--r--weed/replication/sink/filersink/fetch_write.go1
-rw-r--r--weed/replication/sink/filersink/filer_sink.go5
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go35
4 files changed, 33 insertions, 10 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
index c5b8ab4e1..3514c6977 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_utli.go
@@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index b062adcfe..a7392d856 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -78,6 +78,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
+ DiskType: fs.diskType,
Path: path,
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 600ff51f0..509f75116 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -25,6 +25,7 @@ type FilerSink struct {
replication string
collection string
ttlSec int32
+ diskType string
dataCenter string
grpcDialOption grpc.DialOption
address string
@@ -51,6 +52,7 @@ func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string)
configuration.GetString(prefix+"replication"),
configuration.GetString(prefix+"collection"),
configuration.GetInt(prefix+"ttlSec"),
+ configuration.GetString(prefix+"disk"),
security.LoadClientTLS(util.GetViper(), "grpc.client"),
false)
}
@@ -60,7 +62,7 @@ func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
}
func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
- replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
+ replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
fs.address = address
if fs.address == "" {
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
@@ -70,6 +72,7 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
+ fs.diskType = diskType
fs.grpcDialOption = grpcDialOption
fs.writeChunkByFiler = writeChunkByFiler
return nil
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 413c0e3cf..b16eec2e1 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -9,9 +9,12 @@ import (
"github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
+ "gocloud.dev/pubsub/rabbitpubsub"
"net/url"
+ "os"
"path"
"strings"
+ "time"
// _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
@@ -73,7 +76,8 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str
}
type GoCDKPubSubInput struct {
- sub *pubsub.Subscription
+ sub *pubsub.Subscription
+ subURL string
}
func (k *GoCDKPubSubInput) GetName() string {
@@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string {
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
topicUrl := configuration.GetString(prefix + "topic_url")
- subURL := configuration.GetString(prefix + "sub_url")
- glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
- sub, err := pubsub.OpenSubscription(context.Background(), subURL)
+ k.subURL = configuration.GetString(prefix + "sub_url")
+ glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
+ sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
if err != nil {
return err
}
@@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
return err
}
defer ch.Close()
- _, err = ch.QueueInspect(getPath(subURL))
+ _, err = ch.QueueInspect(getPath(k.subURL))
if err != nil {
if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
- if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
+ if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil {
return err
}
} else {
@@ -111,9 +115,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
}
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
- msg, err := k.sub.Receive(context.Background())
+ ctx := context.Background()
+ msg, err := k.sub.Receive(ctx)
if err != nil {
- return
+ var conn *amqp.Connection
+ if k.sub.As(&conn) && conn.IsClosed() {
+ conn.Close()
+ k.sub.Shutdown(ctx)
+ conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
+ if err != nil {
+ glog.Error(err)
+ time.Sleep(time.Second)
+ return
+ }
+ k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil)
+ return
+ }
+ // This is permanent cached sub err
+ glog.Fatal(err)
}
onFailureFn = func() {
if msg.Nackable() {