aboutsummaryrefslogtreecommitdiff
path: root/weed/msgqueue
diff options
context:
space:
mode:
Diffstat (limited to 'weed/msgqueue')
-rw-r--r--weed/msgqueue/configuration.go82
-rw-r--r--weed/msgqueue/kafka/kafka_queue.go76
-rw-r--r--weed/msgqueue/log/log_queue.go29
-rw-r--r--weed/msgqueue/message_queue.go11
4 files changed, 198 insertions, 0 deletions
diff --git a/weed/msgqueue/configuration.go b/weed/msgqueue/configuration.go
new file mode 100644
index 000000000..63f103c1e
--- /dev/null
+++ b/weed/msgqueue/configuration.go
@@ -0,0 +1,82 @@
+package msgqueue
+
+import (
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/spf13/viper"
+)
+
+const (
+ MSG_QUEUE_TOML_EXAMPLE = `
+# A sample TOML config file for SeaweedFS message queue
+
+[log]
+enabled = true
+
+[kafka]
+enabled = false
+hosts = [
+ "localhost:9092"
+]
+topic = "seaweedfs_filer"
+
+`
+)
+
+var (
+ MessageQueues []MessageQueue
+
+ Queue MessageQueue
+)
+
+func LoadConfiguration() {
+
+ // find a filer store
+ viper.SetConfigName("message_queue") // name of config file (without extension)
+ viper.AddConfigPath(".") // optionally look for config in the working directory
+ viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
+ viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
+ if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
+ glog.Fatalf("Failed to load message_queue.toml file from current directory, or $HOME/.seaweedfs/, "+
+ "or /etc/seaweedfs/"+
+ "\n\nPlease follow this example and add a message_queue.toml file to "+
+ "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+MSG_QUEUE_TOML_EXAMPLE, err)
+ }
+
+ glog.V(0).Infof("Reading message queue configuration from %s", viper.ConfigFileUsed())
+ for _, store := range MessageQueues {
+ if viper.GetBool(store.GetName() + ".enabled") {
+ viperSub := viper.Sub(store.GetName())
+ if err := store.Initialize(viperSub); err != nil {
+ glog.Fatalf("Failed to initialize store for %s: %+v",
+ store.GetName(), err)
+ }
+ Queue = store
+ glog.V(0).Infof("Configure message queue for %s from %s", store.GetName(), viper.ConfigFileUsed())
+ return
+ }
+ }
+
+ println()
+ println("Supported message queues are:")
+ for _, store := range MessageQueues {
+ println(" " + store.GetName())
+ }
+
+ println()
+ println("Please configure a supported message queue in", viper.ConfigFileUsed())
+ println()
+
+ os.Exit(-1)
+}
+
+// A simplified interface to decouple from Viper
+type Configuration interface {
+ GetString(key string) string
+ GetBool(key string) bool
+ GetInt(key string) int
+ GetInt64(key string) int64
+ GetFloat64(key string) float64
+ GetStringSlice(key string) []string
+}
diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go
new file mode 100644
index 000000000..c013baf5f
--- /dev/null
+++ b/weed/msgqueue/kafka/kafka_queue.go
@@ -0,0 +1,76 @@
+package kafka
+
+import (
+ _ "github.com/go-sql-driver/mysql"
+ "github.com/chrislusf/seaweedfs/weed/msgqueue"
+ "github.com/golang/protobuf/proto"
+ "github.com/Shopify/sarama"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func init() {
+ msgqueue.MessageQueues = append(msgqueue.MessageQueues, &KafkaQueue{})
+}
+
+type KafkaQueue struct {
+ topic string
+ producer sarama.AsyncProducer
+}
+
+func (k *KafkaQueue) GetName() string {
+ return "kafka"
+}
+
+func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+ return k.initialize(
+ configuration.GetStringSlice("hosts"),
+ configuration.GetString("topic"),
+ )
+}
+
+func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
+ config := sarama.NewConfig()
+ config.Producer.RequiredAcks = sarama.WaitForLocal
+ config.Producer.Partitioner = sarama.NewHashPartitioner
+ config.Producer.Return.Successes = true
+ config.Producer.Return.Errors = true
+ k.producer, err = sarama.NewAsyncProducer(hosts, config)
+ go k.handleSuccess()
+ go k.handleError()
+ return nil
+}
+
+func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) {
+ bytes, err := proto.Marshal(message)
+ if err != nil {
+ return
+ }
+
+ msg := &sarama.ProducerMessage{
+ Topic: k.topic,
+ Key: sarama.StringEncoder(key),
+ Value: sarama.ByteEncoder(bytes),
+ }
+
+ k.producer.Input() <- msg
+
+ return nil
+}
+
+func (k *KafkaQueue) handleSuccess() {
+ for {
+ pm := <-k.producer.Successes()
+ if pm != nil {
+ glog.Infof("producer message success, partition:%d offset:%d key:%v valus:%s", pm.Partition, pm.Offset, pm.Key, pm.Value)
+ }
+ }
+}
+
+func (k *KafkaQueue) handleError() {
+ for {
+ err := <-k.producer.Errors()
+ if err != nil {
+ glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v)", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err)
+ }
+ }
+}
diff --git a/weed/msgqueue/log/log_queue.go b/weed/msgqueue/log/log_queue.go
new file mode 100644
index 000000000..ef7967e6c
--- /dev/null
+++ b/weed/msgqueue/log/log_queue.go
@@ -0,0 +1,29 @@
+package kafka
+
+import (
+ _ "github.com/go-sql-driver/mysql"
+ "github.com/chrislusf/seaweedfs/weed/msgqueue"
+ "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func init() {
+ msgqueue.MessageQueues = append(msgqueue.MessageQueues, &LogQueue{})
+}
+
+type LogQueue struct {
+}
+
+func (k *LogQueue) GetName() string {
+ return "log"
+}
+
+func (k *LogQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+ return nil
+}
+
+func (k *LogQueue) SendMessage(key string, message proto.Message) (err error) {
+
+ glog.V(0).Infof("%v: %+v", key, message)
+ return nil
+}
diff --git a/weed/msgqueue/message_queue.go b/weed/msgqueue/message_queue.go
new file mode 100644
index 000000000..6d57b9b3b
--- /dev/null
+++ b/weed/msgqueue/message_queue.go
@@ -0,0 +1,11 @@
+package msgqueue
+
+import "github.com/golang/protobuf/proto"
+
+type MessageQueue interface {
+ // GetName gets the name to locate the configuration in message_queue.toml file
+ GetName() string
+ // Initialize initializes the file store
+ Initialize(configuration Configuration) error
+ SendMessage(key string, message proto.Message) error
+}