aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/notification_kafka.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-09-17 00:27:56 -0700
committerChris Lu <chris.lu@gmail.com>2018-09-17 00:27:56 -0700
commit788acdf5275dfb7610afe9144c17d9128d1737f6 (patch)
tree55e6701210157981eecd8a4f72d8c9eefa01c457 /weed/replication/notification_kafka.go
parent865a0179369601e496d385e47bdbda93dcc3f243 (diff)
downloadseaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.tar.xz
seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.zip
add WIP filer.replicate
Diffstat (limited to 'weed/replication/notification_kafka.go')
-rw-r--r--weed/replication/notification_kafka.go77
1 files changed, 77 insertions, 0 deletions
diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go
new file mode 100644
index 000000000..ce3b86ae9
--- /dev/null
+++ b/weed/replication/notification_kafka.go
@@ -0,0 +1,77 @@
+package replication
+
+import (
+ "github.com/Shopify/sarama"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &KafkaInput{
+ })
+}
+
+type KafkaInput struct {
+ topic string
+ consumer sarama.Consumer
+ messageChan chan *sarama.ConsumerMessage
+}
+
+func (k *KafkaInput) GetName() string {
+ return "kafka"
+}
+
+func (k *KafkaInput) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
+ glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic"))
+ return k.initialize(
+ configuration.GetStringSlice("hosts"),
+ configuration.GetString("topic"),
+ )
+}
+
+func (k *KafkaInput) initialize(hosts []string, topic string) (err error) {
+ config := sarama.NewConfig()
+ config.Consumer.Return.Errors = true
+ k.consumer, err = sarama.NewConsumer(hosts, config)
+ k.topic = topic
+ k.messageChan = make(chan *sarama.ConsumerMessage, 1)
+
+ partitions, err := k.consumer.Partitions(topic)
+ if err != nil {
+ panic(err)
+ }
+
+ for _, partition := range partitions {
+ partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ for {
+ select {
+ case err := <-partitionConsumer.Errors():
+ fmt.Println(err)
+ case msg := <-partitionConsumer.Messages():
+ k.messageChan <- msg
+ }
+ }
+ }()
+ }
+
+ return nil
+}
+
+func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+
+ msg := <-k.messageChan
+
+ key = string(msg.Key)
+ message = &filer_pb.EventNotification{}
+ err = proto.Unmarshal(msg.Value, message)
+
+ return
+}