aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-09-17 00:27:56 -0700
committerChris Lu <chris.lu@gmail.com>2018-09-17 00:27:56 -0700
commit788acdf5275dfb7610afe9144c17d9128d1737f6 (patch)
tree55e6701210157981eecd8a4f72d8c9eefa01c457 /weed/replication
parent865a0179369601e496d385e47bdbda93dcc3f243 (diff)
downloadseaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.tar.xz
seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.zip
add WIP filer.replicate
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/notification_kafka.go77
-rw-r--r--weed/replication/notifications.go18
-rw-r--r--weed/replication/replicator.go31
-rw-r--r--weed/replication/sink/filer_sink.go163
-rw-r--r--weed/replication/source/filer_source.go97
5 files changed, 386 insertions, 0 deletions
diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go
new file mode 100644
index 000000000..ce3b86ae9
--- /dev/null
+++ b/weed/replication/notification_kafka.go
@@ -0,0 +1,77 @@
+package replication
+
+import (
+ "github.com/Shopify/sarama"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &KafkaInput{
+ })
+}
+
+type KafkaInput struct {
+ topic string
+ consumer sarama.Consumer
+ messageChan chan *sarama.ConsumerMessage
+}
+
+func (k *KafkaInput) GetName() string {
+ return "kafka"
+}
+
+func (k *KafkaInput) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
+ glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic"))
+ return k.initialize(
+ configuration.GetStringSlice("hosts"),
+ configuration.GetString("topic"),
+ )
+}
+
+func (k *KafkaInput) initialize(hosts []string, topic string) (err error) {
+ config := sarama.NewConfig()
+ config.Consumer.Return.Errors = true
+ k.consumer, err = sarama.NewConsumer(hosts, config)
+ k.topic = topic
+ k.messageChan = make(chan *sarama.ConsumerMessage, 1)
+
+ partitions, err := k.consumer.Partitions(topic)
+ if err != nil {
+ panic(err)
+ }
+
+ for _, partition := range partitions {
+ partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ for {
+ select {
+ case err := <-partitionConsumer.Errors():
+ fmt.Println(err)
+ case msg := <-partitionConsumer.Messages():
+ k.messageChan <- msg
+ }
+ }
+ }()
+ }
+
+ return nil
+}
+
+func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+
+ msg := <-k.messageChan
+
+ key = string(msg.Key)
+ message = &filer_pb.EventNotification{}
+ err = proto.Unmarshal(msg.Value, message)
+
+ return
+}
diff --git a/weed/replication/notifications.go b/weed/replication/notifications.go
new file mode 100644
index 000000000..ff40c3aad
--- /dev/null
+++ b/weed/replication/notifications.go
@@ -0,0 +1,18 @@
+package replication
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+type NotificationInput interface {
+ // GetName gets the name to locate the configuration in sync.toml file
+ GetName() string
+ // Initialize initializes the file store
+ Initialize(configuration util.Configuration) error
+ ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
+}
+
+var (
+ NotificationInputs []NotificationInput
+)
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
new file mode 100644
index 000000000..c223021c2
--- /dev/null
+++ b/weed/replication/replicator.go
@@ -0,0 +1,31 @@
+package replication
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+type Replicator struct {
+ sink *sink.FilerSink
+}
+
+func NewReplicator(config util.Configuration) *Replicator {
+
+ sink := &sink.FilerSink{}
+ sink.Initialize(config)
+
+ return &Replicator{
+ sink: sink,
+ }
+}
+
+func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error {
+ if message.OldEntry != nil && message.NewEntry == nil {
+ return r.sink.DeleteEntry(message.OldEntry, message.DeleteChunks)
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ return r.sink.CreateEntry(message.NewEntry)
+ }
+ return r.sink.UpdateEntry(message.OldEntry, message.NewEntry, message.DeleteChunks)
+}
diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go
new file mode 100644
index 000000000..57ebeddff
--- /dev/null
+++ b/weed/replication/sink/filer_sink.go
@@ -0,0 +1,163 @@
+package sink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "fmt"
+ "strings"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "context"
+ "sync"
+)
+
+type ReplicationSink interface {
+ DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error
+ CreateEntry(entry *filer_pb.Entry) error
+ UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error
+}
+
+type FilerSink struct {
+ grpcAddress string
+ id string
+ dir string
+}
+
+func (fs *FilerSink) Initialize(configuration util.Configuration) error {
+ return fs.initialize(
+ configuration.GetString("grpcAddress"),
+ configuration.GetString("id"),
+ configuration.GetString("directory"),
+ )
+}
+
+func (fs *FilerSink) initialize(grpcAddress string, id string, dir string) (err error) {
+ fs.grpcAddress = grpcAddress
+ fs.id = id
+ fs.dir = dir
+ return nil
+}
+
+func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error {
+ return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir, name := filer2.FullPath(entry.Name).DirAndName()
+
+ request := &filer_pb.DeleteEntryRequest{
+ Directory: dir,
+ Name: name,
+ IsDirectory: entry.IsDirectory,
+ IsDeleteData: deleteIncludeChunks,
+ }
+
+ glog.V(1).Infof("delete entry: %v", request)
+ _, err := client.DeleteEntry(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("delete entry %s: %v", entry.Name, err)
+ return fmt.Errorf("delete entry %s: %v", entry.Name, err)
+ }
+
+ return nil
+ })
+}
+
+func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error {
+
+ replicatedChunks, err := replicateChunks(entry.Chunks)
+
+ if err != nil {
+ glog.V(0).Infof("replicate entry chunks %s: %v", entry.Name, err)
+ return fmt.Errorf("replicate entry chunks %s: %v", entry.Name, err)
+ }
+
+ return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir, name := filer2.FullPath(entry.Name).DirAndName()
+
+ request := &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: entry.IsDirectory,
+ Attributes: entry.Attributes,
+ Chunks: replicatedChunks,
+ },
+ }
+
+ glog.V(1).Infof("create: %v", request)
+ if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ glog.V(0).Infof("create entry %s: %v", entry.Name, err)
+ return fmt.Errorf("create entry %s: %v", entry.Name, err)
+ }
+
+ return nil
+ })
+}
+
+func (fs *FilerSink) UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error {
+ return nil
+}
+
+func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ grpcConnection, err := util.GrpcDial(fs.grpcAddress)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
+ }
+ defer grpcConnection.Close()
+
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+
+ return fn(client)
+}
+
+func volumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}
+
+func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) {
+ if len(sourceChunks) == 0 {
+ return
+ }
+ var wg sync.WaitGroup
+ for _, s := range sourceChunks {
+ wg.Add(1)
+ go func(chunk *filer_pb.FileChunk) {
+ defer wg.Done()
+ replicatedChunk, e := replicateOneChunk(chunk)
+ if e != nil {
+ err = e
+ }
+ replicatedChunks = append(replicatedChunks, replicatedChunk)
+ }(s)
+ }
+ wg.Wait()
+
+ return
+}
+
+func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) {
+
+ fileId, err := fetchAndWrite(sourceChunk)
+ if err != nil {
+ return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
+ }
+
+ return &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: sourceChunk.Offset,
+ Size: sourceChunk.Size,
+ Mtime: sourceChunk.Mtime,
+ ETag: sourceChunk.ETag,
+ SourceFileId: sourceChunk.FileId,
+ }, nil
+}
+
+func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
+
+ return
+}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
new file mode 100644
index 000000000..49c623815
--- /dev/null
+++ b/weed/replication/source/filer_source.go
@@ -0,0 +1,97 @@
+package source
+
+import (
+ "io"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "strings"
+ "context"
+)
+
+type ReplicationSource interface {
+ ReadPart(part string) io.ReadCloser
+}
+
+type FilerSource struct {
+ grpcAddress string
+ id string
+ dir string
+}
+
+func (fs *FilerSource) Initialize(configuration util.Configuration) error {
+ return fs.initialize(
+ configuration.GetString("grpcAddress"),
+ configuration.GetString("id"),
+ configuration.GetString("directory"),
+ )
+}
+
+func (fs *FilerSource) initialize(grpcAddress string, id string, dir string) (err error) {
+ fs.grpcAddress = grpcAddress
+ fs.id = id
+ fs.dir = dir
+ return nil
+}
+
+func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) {
+
+ vid2Locations := make(map[string]*filer_pb.Locations)
+
+ vid := volumeId(part)
+
+ err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ glog.V(4).Infof("read lookup volume id locations: %v", vid)
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return err
+ }
+
+ vid2Locations = resp.LocationsMap
+
+ return nil
+ })
+
+ if err != nil {
+ glog.V(1).Infof("replication lookup volume id: %v", vid, err)
+ return nil, fmt.Errorf("replicationlookup volume id %v: %v", vid, err)
+ }
+
+ locations := vid2Locations[vid]
+
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(1).Infof("replication locate volume id: %v", vid, err)
+ return nil, fmt.Errorf("replication locate volume id %v: %v", vid, err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
+
+ _, readCloser, err = util.DownloadUrl(fileUrl)
+
+ return readCloser, err
+}
+
+func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ grpcConnection, err := util.GrpcDial(fs.grpcAddress)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
+ }
+ defer grpcConnection.Close()
+
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+
+ return fn(client)
+}
+
+func volumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}