aboutsummaryrefslogtreecommitdiff
path: root/weed/msgqueue
diff options
context:
space:
mode:
Diffstat (limited to 'weed/msgqueue')
-rw-r--r--weed/msgqueue/configuration.go61
-rw-r--r--weed/msgqueue/kafka/kafka_queue.go3
-rw-r--r--weed/msgqueue/log/log_queue.go3
-rw-r--r--weed/msgqueue/message_queue.go7
4 files changed, 13 insertions, 61 deletions
diff --git a/weed/msgqueue/configuration.go b/weed/msgqueue/configuration.go
index 525809d73..d053f892f 100644
--- a/weed/msgqueue/configuration.go
+++ b/weed/msgqueue/configuration.go
@@ -1,82 +1,29 @@
package msgqueue
import (
- "os"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
)
-const (
- MSG_QUEUE_TOML_EXAMPLE = `
-# A sample TOML config file for SeaweedFS message queue
-
-[log]
-enabled = true
-
-[kafka]
-enabled = false
-hosts = [
- "localhost:9092"
-]
-topic = "seaweedfs_filer"
-
-`
-)
-
var (
MessageQueues []MessageQueue
Queue MessageQueue
)
-func LoadConfiguration() {
-
- // find a filer store
- viper.SetConfigName("message_queue") // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
- if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
- glog.Fatalf("Failed to load message_queue.toml file from current directory, or $HOME/.seaweedfs/, "+
- "or /etc/seaweedfs/"+
- "\n\nPlease follow this example and add a message_queue.toml file to "+
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+MSG_QUEUE_TOML_EXAMPLE)
- }
+func LoadConfiguration(config *viper.Viper) {
- glog.V(0).Infof("Reading message queue configuration from %s", viper.ConfigFileUsed())
for _, store := range MessageQueues {
- if viper.GetBool(store.GetName() + ".enabled") {
- viperSub := viper.Sub(store.GetName())
+ if config.GetBool(store.GetName() + ".enabled") {
+ viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v",
store.GetName(), err)
}
Queue = store
- glog.V(0).Infof("Configure message queue for %s from %s", store.GetName(), viper.ConfigFileUsed())
+ glog.V(0).Infof("Configure message queue for %s", store.GetName())
return
}
}
- println()
- println("Supported message queues are:")
- for _, store := range MessageQueues {
- println(" " + store.GetName())
- }
-
- println()
- println("Please configure a supported message queue in", viper.ConfigFileUsed())
- println()
-
- os.Exit(-1)
-}
-
-// A simplified interface to decouple from Viper
-type Configuration interface {
- GetString(key string) string
- GetBool(key string) bool
- GetInt(key string) int
- GetInt64(key string) int64
- GetFloat64(key string) float64
- GetStringSlice(key string) []string
}
diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go
index f070fd597..7f0273ad9 100644
--- a/weed/msgqueue/kafka/kafka_queue.go
+++ b/weed/msgqueue/kafka/kafka_queue.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/msgqueue"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -20,7 +21,7 @@ func (k *KafkaQueue) GetName() string {
return "kafka"
}
-func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) {
glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic"))
return k.initialize(
diff --git a/weed/msgqueue/log/log_queue.go b/weed/msgqueue/log/log_queue.go
index 9ce9ff8be..612212ae0 100644
--- a/weed/msgqueue/log/log_queue.go
+++ b/weed/msgqueue/log/log_queue.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/msgqueue"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -17,7 +18,7 @@ func (k *LogQueue) GetName() string {
return "log"
}
-func (k *LogQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+func (k *LogQueue) Initialize(configuration util.Configuration) (err error) {
return nil
}
diff --git a/weed/msgqueue/message_queue.go b/weed/msgqueue/message_queue.go
index 6d57b9b3b..a14d9b480 100644
--- a/weed/msgqueue/message_queue.go
+++ b/weed/msgqueue/message_queue.go
@@ -1,11 +1,14 @@
package msgqueue
-import "github.com/golang/protobuf/proto"
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
type MessageQueue interface {
// GetName gets the name to locate the configuration in message_queue.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration Configuration) error
+ Initialize(configuration util.Configuration) error
SendMessage(key string, message proto.Message) error
}