diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-02-09 11:37:07 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-02-09 11:37:07 -0800 |
| commit | 821c46edf10097200b986bd17dc01d3991cf57ff (patch) | |
| tree | ca181a9ef3c2f7e45cf0dbb40373b87717a9a636 /weed/replication | |
| parent | 15da5834e1a33d060924740ba195f6bcd79f2af2 (diff) | |
| parent | a6e8d606b47e5f3e8cd8a57d2769d6f1404fbc8f (diff) | |
| download | seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.tar.xz seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.zip | |
Merge branch 'master' into support_ssd_volume
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/repl_util/replication_utli.go | 10 | ||||
| -rw-r--r-- | weed/replication/replicator.go | 13 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 35 | ||||
| -rw-r--r-- | weed/replication/sink/localsink/local_incremental_sink.go | 17 | ||||
| -rw-r--r-- | weed/replication/sink/localsink/local_sink.go | 101 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 19 | ||||
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 2 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 103 | ||||
| -rw-r--r-- | weed/replication/sub/notification_google_pub_sub.go | 10 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 2 | ||||
| -rw-r--r-- | weed/replication/sub/notifications.go | 2 |
12 files changed, 289 insertions, 29 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go index 42777f4ad..c5b8ab4e1 100644 --- a/weed/replication/repl_util/replication_utli.go +++ b/weed/replication/repl_util/replication_utli.go @@ -17,9 +17,10 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer } var writeErr error + var shouldRetry bool for _, fileUrl := range fileUrls { - _, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { @@ -30,11 +31,12 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer break } } - - if err != nil { + if shouldRetry && err != nil { return err } - + if writeErr != nil { + return writeErr + } } return nil } diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index c4228434f..7688029e6 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "google.golang.org/grpc" "strings" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -40,7 +41,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) return nil } - newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) + var dateKey string + if r.sink.GetName() == "local_incremental" { + var mTime int64 + if message.NewEntry != nil { + mTime = message.NewEntry.Attributes.Mtime + } else if message.OldEntry != nil { + mTime = message.OldEntry.Attributes.Mtime + } + dateKey = time.Unix(mTime, 0).Format("2006-01-02") + } + newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):]) glog.V(3).Infof("replicate %s => %s", key, newKey) key = newKey if message.OldEntry != nil && message.NewEntry == nil { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 6b51a7966..a7392d856 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -30,6 +30,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st replicatedChunk, e := fs.replicateOneChunk(chunk, path) if e != nil { err = e + return } replicatedChunks[index] = replicatedChunk }(sourceChunk, chunkIndex) @@ -98,6 +99,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if fs.writeChunkByFiler { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) + } glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index a69c9dc8e..9960634f6 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -3,6 +3,8 @@ package filersink import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" "google.golang.org/grpc" @@ -17,15 +19,17 @@ import ( ) type FilerSink struct { - filerSource *source.FilerSource - grpcAddress string - dir string - replication string - collection string - ttlSec int32 + filerSource *source.FilerSource + grpcAddress string + dir string + replication string + collection string + ttlSec int32 diskType string - dataCenter string - grpcDialOption grpc.DialOption + dataCenter string + grpcDialOption grpc.DialOption + address string + writeChunkByFiler bool } func init() { @@ -42,21 +46,27 @@ func (fs *FilerSink) GetSinkToDirectory() string { func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { return fs.DoInitialize( + "", configuration.GetString(prefix+"grpcAddress"), configuration.GetString(prefix+"directory"), configuration.GetString(prefix+"replication"), configuration.GetString(prefix+"collection"), configuration.GetInt(prefix+"ttlSec"), configuration.GetString(prefix+"disk"), - security.LoadClientTLS(util.GetViper(), "grpc.client")) + security.LoadClientTLS(util.GetViper(), "grpc.client"), + false) } func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { fs.filerSource = s } -func (fs *FilerSink) DoInitialize(grpcAddress string, dir string, - replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption) (err error) { +func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string, + replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) { + fs.address = address + if fs.address == "" { + fs.address = pb.GrpcAddressToServerAddress(grpcAddress) + } fs.grpcAddress = grpcAddress fs.dir = dir fs.replication = replication @@ -64,6 +74,7 @@ func (fs *FilerSink) DoInitialize(grpcAddress string, dir string, fs.ttlSec = int32(ttlSec) fs.diskType = diskType fs.grpcDialOption = grpcDialOption + fs.writeChunkByFiler = writeChunkByFiler return nil } @@ -209,7 +220,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent }) } -func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { +func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) if aErr != nil { return nil, nil, aErr diff --git a/weed/replication/sink/localsink/local_incremental_sink.go b/weed/replication/sink/localsink/local_incremental_sink.go new file mode 100644 index 000000000..a1d49e28a --- /dev/null +++ b/weed/replication/sink/localsink/local_incremental_sink.go @@ -0,0 +1,17 @@ +package localsink + +import ( + "github.com/chrislusf/seaweedfs/weed/replication/sink" +) + +type LocalIncSink struct { + LocalSink +} + +func (localincsink *LocalIncSink) GetName() string { + return "local_incremental" +} + +func init() { + sink.Sinks = append(sink.Sinks, &LocalIncSink{}) +} diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go new file mode 100644 index 000000000..21c625c3f --- /dev/null +++ b/weed/replication/sink/localsink/local_sink.go @@ -0,0 +1,101 @@ +package localsink + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "io/ioutil" + "os" + "path/filepath" + "strings" +) + +type LocalSink struct { + Dir string + filerSource *source.FilerSource +} + +func init() { + sink.Sinks = append(sink.Sinks, &LocalSink{}) +} + +func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) { + localsink.filerSource = s +} + +func (localsink *LocalSink) GetName() string { + return "local" +} + +func (localsink *LocalSink) isMultiPartEntry(key string) bool { + return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/") +} + +func (localsink *LocalSink) initialize(dir string) error { + localsink.Dir = dir + return nil +} + +func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { + dir := configuration.GetString(prefix + "directory") + glog.V(4).Infof("sink.local.directory: %v", dir) + return localsink.initialize(dir) +} + +func (localsink *LocalSink) GetSinkToDirectory() string { + return localsink.Dir +} + +func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { + if localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Delete Entry key: %s", key) + if err := os.Remove(key); err != nil { + glog.V(0).Infof("remove entry key %s: %s", key, err) + } + return nil +} + +func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { + if entry.IsDirectory || localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Create Entry key: %s", key) + + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + + dir := filepath.Dir(key) + + if _, err := os.Stat(dir); os.IsNotExist(err) { + glog.V(4).Infof("Create Direcotry key: %s", dir) + if err = os.MkdirAll(dir, 0); err != nil { + return err + } + } + + writeFunc := func(data []byte) error { + writeErr := ioutil.WriteFile(key, data, 0) + return writeErr + } + + if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil { + return err + } + + return nil +} + +func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { + if localsink.isMultiPartEntry(key) { + return true, nil + } + glog.V(4).Infof("Update Entry key: %s", key) + // do delete and create + return false, nil +} diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index ff4f2eb26..3982360b0 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -25,19 +25,28 @@ type FilerSource struct { grpcAddress string grpcDialOption grpc.DialOption Dir string + address string + proxyByFiler bool } func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { return fs.DoInitialize( + "", configuration.GetString(prefix+"grpcAddress"), configuration.GetString(prefix+"directory"), + false, ) } -func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) { +func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) { + fs.address = address + if fs.address == "" { + fs.address = pb.GrpcAddressToServerAddress(grpcAddress) + } fs.grpcAddress = grpcAddress fs.Dir = dir fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + fs.proxyByFiler = readChunkFromFiler return nil } @@ -81,9 +90,13 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) return } -func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) { +func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { + + if fs.proxyByFiler { + return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId) + } - fileUrls, err := fs.LookupFileId(part) + fileUrls, err := fs.LookupFileId(fileId) if err != nil { return "", nil, nil, err } diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index 1dd386ba7..642834c72 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que return nil } -func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { // receive message result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 9726096e5..413c0e3cf 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -2,13 +2,17 @@ package sub import ( "context" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" + "github.com/streadway/amqp" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" + "net/url" + "path" + "strings" + // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" @@ -19,6 +23,55 @@ func init() { NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{}) } +func getPath(rawUrl string) string { + parsedUrl, _ := url.Parse(rawUrl) + return path.Join(parsedUrl.Host, parsedUrl.Path) +} + +func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl string) error { + exchangeName := getPath(exchangeUrl) + queueName := getPath(queueUrl) + exchangeNameDLX := "DLX." + exchangeName + queueNameDLX := "DLX." + queueName + ch, err := conn.Channel() + if err != nil { + glog.Error(err) + return err + } + defer ch.Close() + if err := ch.ExchangeDeclare( + exchangeNameDLX, "fanout", false, false, false, false, nil); err != nil { + glog.Error(err) + return err + } + if err := ch.ExchangeDeclare( + exchangeName, "fanout", false, false, false, false, nil); err != nil { + glog.Error(err) + return err + } + if _, err := ch.QueueDeclare( + queueName, false, false, false, false, + amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil { + glog.Error(err) + return err + } + if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil { + glog.Error(err) + return err + } + if _, err := ch.QueueDeclare( + queueNameDLX, false, false, false, false, + amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil { + glog.Error(err) + return err + } + if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil { + glog.Error(err) + return err + } + return nil +} + type GoCDKPubSubInput struct { sub *pubsub.Subscription } @@ -28,23 +81,65 @@ func (k *GoCDKPubSubInput) GetName() string { } func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { + topicUrl := configuration.GetString(prefix + "topic_url") subURL := configuration.GetString(prefix + "sub_url") glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) sub, err := pubsub.OpenSubscription(context.Background(), subURL) if err != nil { return err } + var conn *amqp.Connection + if sub.As(&conn) { + ch, err := conn.Channel() + if err != nil { + return err + } + defer ch.Close() + _, err = ch.QueueInspect(getPath(subURL)) + if err != nil { + if strings.HasPrefix(err.Error(), "Exception (404) Reason") { + if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil { + return err + } + } else { + return err + } + } + } k.sub = sub return nil } -func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { msg, err := k.sub.Receive(context.Background()) + if err != nil { + return + } + onFailureFn = func() { + if msg.Nackable() { + isRedelivered := false + var delivery amqp.Delivery + if msg.As(&delivery) { + isRedelivered = delivery.Redelivered + glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered) + } + if isRedelivered { + if err := delivery.Nack(false, false); err != nil { + glog.Error(err) + } + } else { + msg.Nack() + } + } + } + onSuccessFn = func() { + msg.Ack() + } key = msg.Metadata["key"] message = &filer_pb.EventNotification{} err = proto.Unmarshal(msg.Body, message) if err != nil { - return "", nil, err + return "", nil, onSuccessFn, onFailureFn, err } - return key, message, nil + return key, message, onSuccessFn, onFailureFn, nil } diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index a950bb42b..f7c767d4a 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { k.messageChan <- m - m.Ack() }) return err } -func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { m := <-k.messageChan + onSuccessFn = func() { + m.Ack() + } + onFailureFn = func() { + m.Nack() + } + // process the message key = m.Attributes["key"] message = &filer_pb.EventNotification{} diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index fa9cfad9b..622a759ea 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, return nil } -func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { msg := <-k.messageChan diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go index 8a2668f98..d5a910db9 100644 --- a/weed/replication/sub/notifications.go +++ b/weed/replication/sub/notifications.go @@ -10,7 +10,7 @@ type NotificationInput interface { GetName() string // Initialize initializes the file store Initialize(configuration util.Configuration, prefix string) error - ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) + ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) } var ( |
