diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-09-17 00:27:56 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-09-17 00:27:56 -0700 |
| commit | 788acdf5275dfb7610afe9144c17d9128d1737f6 (patch) | |
| tree | 55e6701210157981eecd8a4f72d8c9eefa01c457 /weed/replication | |
| parent | 865a0179369601e496d385e47bdbda93dcc3f243 (diff) | |
| download | seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.tar.xz seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.zip | |
add WIP filer.replicate
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/notification_kafka.go | 77 | ||||
| -rw-r--r-- | weed/replication/notifications.go | 18 | ||||
| -rw-r--r-- | weed/replication/replicator.go | 31 | ||||
| -rw-r--r-- | weed/replication/sink/filer_sink.go | 163 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 97 |
5 files changed, 386 insertions, 0 deletions
diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go new file mode 100644 index 000000000..ce3b86ae9 --- /dev/null +++ b/weed/replication/notification_kafka.go @@ -0,0 +1,77 @@ +package replication + +import ( + "github.com/Shopify/sarama" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + NotificationInputs = append(NotificationInputs, &KafkaInput{ + }) +} + +type KafkaInput struct { + topic string + consumer sarama.Consumer + messageChan chan *sarama.ConsumerMessage +} + +func (k *KafkaInput) GetName() string { + return "kafka" +} + +func (k *KafkaInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) + return k.initialize( + configuration.GetStringSlice("hosts"), + configuration.GetString("topic"), + ) +} + +func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { + config := sarama.NewConfig() + config.Consumer.Return.Errors = true + k.consumer, err = sarama.NewConsumer(hosts, config) + k.topic = topic + k.messageChan = make(chan *sarama.ConsumerMessage, 1) + + partitions, err := k.consumer.Partitions(topic) + if err != nil { + panic(err) + } + + for _, partition := range partitions { + partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + if err != nil { + panic(err) + } + go func() { + for { + select { + case err := <-partitionConsumer.Errors(): + fmt.Println(err) + case msg := <-partitionConsumer.Messages(): + k.messageChan <- msg + } + } + }() + } + + return nil +} + +func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + msg := <-k.messageChan + + key = string(msg.Key) + message = &filer_pb.EventNotification{} + err = proto.Unmarshal(msg.Value, message) + + return +} diff --git a/weed/replication/notifications.go b/weed/replication/notifications.go new file mode 100644 index 000000000..ff40c3aad --- /dev/null +++ b/weed/replication/notifications.go @@ -0,0 +1,18 @@ +package replication + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +type NotificationInput interface { + // GetName gets the name to locate the configuration in sync.toml file + GetName() string + // Initialize initializes the file store + Initialize(configuration util.Configuration) error + ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) +} + +var ( + NotificationInputs []NotificationInput +) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go new file mode 100644 index 000000000..c223021c2 --- /dev/null +++ b/weed/replication/replicator.go @@ -0,0 +1,31 @@ +package replication + +import ( + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +type Replicator struct { + sink *sink.FilerSink +} + +func NewReplicator(config util.Configuration) *Replicator { + + sink := &sink.FilerSink{} + sink.Initialize(config) + + return &Replicator{ + sink: sink, + } +} + +func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error { + if message.OldEntry != nil && message.NewEntry == nil { + return r.sink.DeleteEntry(message.OldEntry, message.DeleteChunks) + } + if message.OldEntry == nil && message.NewEntry != nil { + return r.sink.CreateEntry(message.NewEntry) + } + return r.sink.UpdateEntry(message.OldEntry, message.NewEntry, message.DeleteChunks) +} diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go new file mode 100644 index 000000000..57ebeddff --- /dev/null +++ b/weed/replication/sink/filer_sink.go @@ -0,0 +1,163 @@ +package sink + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "fmt" + "strings" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "context" + "sync" +) + +type ReplicationSink interface { + DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error + CreateEntry(entry *filer_pb.Entry) error + UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error +} + +type FilerSink struct { + grpcAddress string + id string + dir string +} + +func (fs *FilerSink) Initialize(configuration util.Configuration) error { + return fs.initialize( + configuration.GetString("grpcAddress"), + configuration.GetString("id"), + configuration.GetString("directory"), + ) +} + +func (fs *FilerSink) initialize(grpcAddress string, id string, dir string) (err error) { + fs.grpcAddress = grpcAddress + fs.id = id + fs.dir = dir + return nil +} + +func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error { + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir, name := filer2.FullPath(entry.Name).DirAndName() + + request := &filer_pb.DeleteEntryRequest{ + Directory: dir, + Name: name, + IsDirectory: entry.IsDirectory, + IsDeleteData: deleteIncludeChunks, + } + + glog.V(1).Infof("delete entry: %v", request) + _, err := client.DeleteEntry(context.Background(), request) + if err != nil { + glog.V(0).Infof("delete entry %s: %v", entry.Name, err) + return fmt.Errorf("delete entry %s: %v", entry.Name, err) + } + + return nil + }) +} + +func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { + + replicatedChunks, err := replicateChunks(entry.Chunks) + + if err != nil { + glog.V(0).Infof("replicate entry chunks %s: %v", entry.Name, err) + return fmt.Errorf("replicate entry chunks %s: %v", entry.Name, err) + } + + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir, name := filer2.FullPath(entry.Name).DirAndName() + + request := &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: entry.IsDirectory, + Attributes: entry.Attributes, + Chunks: replicatedChunks, + }, + } + + glog.V(1).Infof("create: %v", request) + if _, err := client.CreateEntry(context.Background(), request); err != nil { + glog.V(0).Infof("create entry %s: %v", entry.Name, err) + return fmt.Errorf("create entry %s: %v", entry.Name, err) + } + + return nil + }) +} + +func (fs *FilerSink) UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { + return nil +} + +func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + grpcConnection, err := util.GrpcDial(fs.grpcAddress) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) + } + defer grpcConnection.Close() + + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + + return fn(client) +} + +func volumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} + +func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { + if len(sourceChunks) == 0 { + return + } + var wg sync.WaitGroup + for _, s := range sourceChunks { + wg.Add(1) + go func(chunk *filer_pb.FileChunk) { + defer wg.Done() + replicatedChunk, e := replicateOneChunk(chunk) + if e != nil { + err = e + } + replicatedChunks = append(replicatedChunks, replicatedChunk) + }(s) + } + wg.Wait() + + return +} + +func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { + + fileId, err := fetchAndWrite(sourceChunk) + if err != nil { + return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) + } + + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: sourceChunk.Offset, + Size: sourceChunk.Size, + Mtime: sourceChunk.Mtime, + ETag: sourceChunk.ETag, + SourceFileId: sourceChunk.FileId, + }, nil +} + +func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { + + return +} diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go new file mode 100644 index 000000000..49c623815 --- /dev/null +++ b/weed/replication/source/filer_source.go @@ -0,0 +1,97 @@ +package source + +import ( + "io" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "strings" + "context" +) + +type ReplicationSource interface { + ReadPart(part string) io.ReadCloser +} + +type FilerSource struct { + grpcAddress string + id string + dir string +} + +func (fs *FilerSource) Initialize(configuration util.Configuration) error { + return fs.initialize( + configuration.GetString("grpcAddress"), + configuration.GetString("id"), + configuration.GetString("directory"), + ) +} + +func (fs *FilerSource) initialize(grpcAddress string, id string, dir string) (err error) { + fs.grpcAddress = grpcAddress + fs.id = id + fs.dir = dir + return nil +} + +func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) { + + vid2Locations := make(map[string]*filer_pb.Locations) + + vid := volumeId(part) + + err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + glog.V(4).Infof("read lookup volume id locations: %v", vid) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + vid2Locations = resp.LocationsMap + + return nil + }) + + if err != nil { + glog.V(1).Infof("replication lookup volume id: %v", vid, err) + return nil, fmt.Errorf("replicationlookup volume id %v: %v", vid, err) + } + + locations := vid2Locations[vid] + + if locations == nil || len(locations.Locations) == 0 { + glog.V(1).Infof("replication locate volume id: %v", vid, err) + return nil, fmt.Errorf("replication locate volume id %v: %v", vid, err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) + + _, readCloser, err = util.DownloadUrl(fileUrl) + + return readCloser, err +} + +func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + grpcConnection, err := util.GrpcDial(fs.grpcAddress) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) + } + defer grpcConnection.Close() + + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + + return fn(client) +} + +func volumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} |
