diff options
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/repl_util/replication_utli.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 1 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 5 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 35 |
4 files changed, 33 insertions, 10 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go index c5b8ab4e1..3514c6977 100644 --- a/weed/replication/repl_util/replication_utli.go +++ b/weed/replication/repl_util/replication_utli.go @@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer var shouldRetry bool for _, fileUrl := range fileUrls { - shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index b062adcfe..a7392d856 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -78,6 +78,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) Collection: fs.collection, TtlSec: fs.ttlSec, DataCenter: fs.dataCenter, + DiskType: fs.diskType, Path: path, } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 600ff51f0..509f75116 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -25,6 +25,7 @@ type FilerSink struct { replication string collection string ttlSec int32 + diskType string dataCenter string grpcDialOption grpc.DialOption address string @@ -51,6 +52,7 @@ func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) configuration.GetString(prefix+"replication"), configuration.GetString(prefix+"collection"), configuration.GetInt(prefix+"ttlSec"), + configuration.GetString(prefix+"disk"), security.LoadClientTLS(util.GetViper(), "grpc.client"), false) } @@ -60,7 +62,7 @@ func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { } func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string, - replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) { + replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) { fs.address = address if fs.address == "" { fs.address = pb.GrpcAddressToServerAddress(grpcAddress) @@ -70,6 +72,7 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string, fs.replication = replication fs.collection = collection fs.ttlSec = int32(ttlSec) + fs.diskType = diskType fs.grpcDialOption = grpcDialOption fs.writeChunkByFiler = writeChunkByFiler return nil diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 413c0e3cf..b16eec2e1 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -9,9 +9,12 @@ import ( "github.com/streadway/amqp" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" + "gocloud.dev/pubsub/rabbitpubsub" "net/url" + "os" "path" "strings" + "time" // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" @@ -73,7 +76,8 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str } type GoCDKPubSubInput struct { - sub *pubsub.Subscription + sub *pubsub.Subscription + subURL string } func (k *GoCDKPubSubInput) GetName() string { @@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string { func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { topicUrl := configuration.GetString(prefix + "topic_url") - subURL := configuration.GetString(prefix + "sub_url") - glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) - sub, err := pubsub.OpenSubscription(context.Background(), subURL) + k.subURL = configuration.GetString(prefix + "sub_url") + glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL) + sub, err := pubsub.OpenSubscription(context.Background(), k.subURL) if err != nil { return err } @@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s return err } defer ch.Close() - _, err = ch.QueueInspect(getPath(subURL)) + _, err = ch.QueueInspect(getPath(k.subURL)) if err != nil { if strings.HasPrefix(err.Error(), "Exception (404) Reason") { - if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil { + if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil { return err } } else { @@ -111,9 +115,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s } func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { - msg, err := k.sub.Receive(context.Background()) + ctx := context.Background() + msg, err := k.sub.Receive(ctx) if err != nil { - return + var conn *amqp.Connection + if k.sub.As(&conn) && conn.IsClosed() { + conn.Close() + k.sub.Shutdown(ctx) + conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) + if err != nil { + glog.Error(err) + time.Sleep(time.Second) + return + } + k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil) + return + } + // This is permanent cached sub err + glog.Fatal(err) } onFailureFn = func() { if msg.Nackable() { |
