aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-08-19 15:17:55 -0700
committerChris Lu <chris.lu@gmail.com>2018-08-19 15:17:55 -0700
commitf827ada811bb807a9cf9e3db702a01e6f8e4ae3b (patch)
treea1a18dc6e30b906bb97492aa98692003ec5a555d
parentc91372daa6ddceed2ef66e6e36d986658551d237 (diff)
downloadseaweedfs-f827ada811bb807a9cf9e3db702a01e6f8e4ae3b.tar.xz
seaweedfs-f827ada811bb807a9cf9e3db702a01e6f8e4ae3b.zip
merge notification config with filer.toml
-rw-r--r--weed/command/filer.go2
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/filer2/cassandra/cassandra_store.go3
-rw-r--r--weed/filer2/configuration.go36
-rw-r--r--weed/filer2/filerstore.go3
-rw-r--r--weed/filer2/leveldb/leveldb_store.go2
-rw-r--r--weed/filer2/memdb/memdb_store.go3
-rw-r--r--weed/filer2/mysql/mysql_store.go3
-rw-r--r--weed/filer2/postgres/postgres_store.go3
-rw-r--r--weed/filer2/redis/redis_cluster_store.go3
-rw-r--r--weed/filer2/redis/redis_store.go3
-rw-r--r--weed/msgqueue/configuration.go61
-rw-r--r--weed/msgqueue/kafka/kafka_queue.go3
-rw-r--r--weed/msgqueue/log/log_queue.go3
-rw-r--r--weed/msgqueue/message_queue.go7
-rw-r--r--weed/server/filer_server.go36
-rw-r--r--weed/util/config.go134
17 files changed, 70 insertions, 236 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index d42db6418..eaef380a2 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -50,7 +50,6 @@ func init() {
f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 1000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
- f.enableNotification = cmdFiler.Flag.Bool("notify", false, "send file updates to the queue defined in message_queue.toml")
}
var cmdFiler = &Command{
@@ -100,7 +99,6 @@ func (fo *FilerOptions) start() {
SecretKey: *fo.secretKey,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
- EnableNotification: *fo.enableNotification,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/server.go b/weed/command/server.go
index bef5e635e..842a32376 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -91,7 +91,6 @@ func init() {
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
- filerOptions.enableNotification = cmdServer.Flag.Bool("filer.notify", false, "send file updates to the queue defined in message_queue.toml")
}
func runServer(cmd *Command, args []string) bool {
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index 7552cb524..1309cd1ee 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/gocql/gocql"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -20,7 +21,7 @@ func (store *CassandraStore) GetName() string {
return "cassandra"
}
-func (store *CassandraStore) Initialize(configuration filer2.Configuration) (err error) {
+func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize(
configuration.GetString("keyspace"),
configuration.GetStringSlice("hosts"),
diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go
index dac537673..387672311 100644
--- a/weed/filer2/configuration.go
+++ b/weed/filer2/configuration.go
@@ -4,7 +4,7 @@ import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/spf13/viper"
+ "github.com/spf13/viper"
)
const (
@@ -91,29 +91,17 @@ var (
Stores []FilerStore
)
-func (f *Filer) LoadConfiguration() {
-
- // find a filer store
- viper.SetConfigName("filer") // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
- if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
- glog.Fatalf("Failed to load filer.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/" +
- "\n\nPlease follow this example and add a filer.toml file to " +
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n" + FILER_TOML_EXAMPLE)
- }
+func (f *Filer) LoadConfiguration(config *viper.Viper) {
- glog.V(0).Infof("Reading filer configuration from %s", viper.ConfigFileUsed())
for _, store := range Stores {
- if viper.GetBool(store.GetName() + ".enabled") {
- viperSub := viper.Sub(store.GetName())
+ if config.GetBool(store.GetName() + ".enabled") {
+ viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v",
store.GetName(), err)
}
f.SetStore(store)
- glog.V(0).Infof("Configure filer for %s from %s", store.GetName(), viper.ConfigFileUsed())
+ glog.V(0).Infof("Configure filer for %s", store.GetName())
return
}
}
@@ -124,19 +112,5 @@ func (f *Filer) LoadConfiguration() {
println(" " + store.GetName())
}
- println()
- println("Please configure a supported filer store in", viper.ConfigFileUsed())
- println()
-
os.Exit(-1)
}
-
-// A simplified interface to decouple from Viper
-type Configuration interface {
- GetString(key string) string
- GetBool(key string) bool
- GetInt(key string) int
- GetInt64(key string) int64
- GetFloat64(key string) float64
- GetStringSlice(key string) []string
-}
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index 68fc06a5d..9ef1d9d48 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -2,13 +2,14 @@ package filer2
import (
"errors"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration Configuration) error
+ Initialize(configuration util.Configuration) error
InsertEntry(*Entry) error
UpdateEntry(*Entry) (err error)
// err == filer2.ErrNotFound if not found
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index ac2fb0d66..5f3427a3d 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -27,7 +27,7 @@ func (store *LevelDBStore) GetName() string {
return "leveldb"
}
-func (store *LevelDBStore) Initialize(configuration filer2.Configuration) (err error) {
+func (store *LevelDBStore) Initialize(configuration weed_util.Configuration) (err error) {
dir := configuration.GetString("dir")
return store.initialize(dir)
}
diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go
index 08cee0afd..db3a86bd2 100644
--- a/weed/filer2/memdb/memdb_store.go
+++ b/weed/filer2/memdb/memdb_store.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/google/btree"
"strings"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -27,7 +28,7 @@ func (store *MemDbStore) GetName() string {
return "memory"
}
-func (store *MemDbStore) Initialize(configuration filer2.Configuration) (err error) {
+func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) {
store.tree = btree.New(8)
return nil
}
diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go
index 6e0ca2b68..8adec161f 100644
--- a/weed/filer2/mysql/mysql_store.go
+++ b/weed/filer2/mysql/mysql_store.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
_ "github.com/go-sql-driver/mysql"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -25,7 +26,7 @@ func (store *MysqlStore) GetName() string {
return "mysql"
}
-func (store *MysqlStore) Initialize(configuration filer2.Configuration) (err error) {
+func (store *MysqlStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize(
configuration.GetString("username"),
configuration.GetString("password"),
diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go
index f5904ab21..8836e2be7 100644
--- a/weed/filer2/postgres/postgres_store.go
+++ b/weed/filer2/postgres/postgres_store.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
_ "github.com/lib/pq"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -25,7 +26,7 @@ func (store *PostgresStore) GetName() string {
return "postgres"
}
-func (store *PostgresStore) Initialize(configuration filer2.Configuration) (err error) {
+func (store *PostgresStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize(
configuration.GetString("username"),
configuration.GetString("password"),
diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go
index 02a62567c..8a3dce4b8 100644
--- a/weed/filer2/redis/redis_cluster_store.go
+++ b/weed/filer2/redis/redis_cluster_store.go
@@ -3,6 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/go-redis/redis"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -17,7 +18,7 @@ func (store *RedisClusterStore) GetName() string {
return "redis_cluster"
}
-func (store *RedisClusterStore) Initialize(configuration filer2.Configuration) (err error) {
+func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize(
configuration.GetStringSlice("addresses"),
)
diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go
index 85236a5af..77d157ab4 100644
--- a/weed/filer2/redis/redis_store.go
+++ b/weed/filer2/redis/redis_store.go
@@ -3,6 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/go-redis/redis"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -17,7 +18,7 @@ func (store *RedisStore) GetName() string {
return "redis"
}
-func (store *RedisStore) Initialize(configuration filer2.Configuration) (err error) {
+func (store *RedisStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize(
configuration.GetString("address"),
configuration.GetString("password"),
diff --git a/weed/msgqueue/configuration.go b/weed/msgqueue/configuration.go
index 525809d73..d053f892f 100644
--- a/weed/msgqueue/configuration.go
+++ b/weed/msgqueue/configuration.go
@@ -1,82 +1,29 @@
package msgqueue
import (
- "os"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
)
-const (
- MSG_QUEUE_TOML_EXAMPLE = `
-# A sample TOML config file for SeaweedFS message queue
-
-[log]
-enabled = true
-
-[kafka]
-enabled = false
-hosts = [
- "localhost:9092"
-]
-topic = "seaweedfs_filer"
-
-`
-)
-
var (
MessageQueues []MessageQueue
Queue MessageQueue
)
-func LoadConfiguration() {
-
- // find a filer store
- viper.SetConfigName("message_queue") // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
- if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
- glog.Fatalf("Failed to load message_queue.toml file from current directory, or $HOME/.seaweedfs/, "+
- "or /etc/seaweedfs/"+
- "\n\nPlease follow this example and add a message_queue.toml file to "+
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+MSG_QUEUE_TOML_EXAMPLE)
- }
+func LoadConfiguration(config *viper.Viper) {
- glog.V(0).Infof("Reading message queue configuration from %s", viper.ConfigFileUsed())
for _, store := range MessageQueues {
- if viper.GetBool(store.GetName() + ".enabled") {
- viperSub := viper.Sub(store.GetName())
+ if config.GetBool(store.GetName() + ".enabled") {
+ viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v",
store.GetName(), err)
}
Queue = store
- glog.V(0).Infof("Configure message queue for %s from %s", store.GetName(), viper.ConfigFileUsed())
+ glog.V(0).Infof("Configure message queue for %s", store.GetName())
return
}
}
- println()
- println("Supported message queues are:")
- for _, store := range MessageQueues {
- println(" " + store.GetName())
- }
-
- println()
- println("Please configure a supported message queue in", viper.ConfigFileUsed())
- println()
-
- os.Exit(-1)
-}
-
-// A simplified interface to decouple from Viper
-type Configuration interface {
- GetString(key string) string
- GetBool(key string) bool
- GetInt(key string) int
- GetInt64(key string) int64
- GetFloat64(key string) float64
- GetStringSlice(key string) []string
}
diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go
index f070fd597..7f0273ad9 100644
--- a/weed/msgqueue/kafka/kafka_queue.go
+++ b/weed/msgqueue/kafka/kafka_queue.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/msgqueue"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -20,7 +21,7 @@ func (k *KafkaQueue) GetName() string {
return "kafka"
}
-func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) {
glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic"))
return k.initialize(
diff --git a/weed/msgqueue/log/log_queue.go b/weed/msgqueue/log/log_queue.go
index 9ce9ff8be..612212ae0 100644
--- a/weed/msgqueue/log/log_queue.go
+++ b/weed/msgqueue/log/log_queue.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/msgqueue"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -17,7 +18,7 @@ func (k *LogQueue) GetName() string {
return "log"
}
-func (k *LogQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+func (k *LogQueue) Initialize(configuration util.Configuration) (err error) {
return nil
}
diff --git a/weed/msgqueue/message_queue.go b/weed/msgqueue/message_queue.go
index 6d57b9b3b..a14d9b480 100644
--- a/weed/msgqueue/message_queue.go
+++ b/weed/msgqueue/message_queue.go
@@ -1,11 +1,14 @@
package msgqueue
-import "github.com/golang/protobuf/proto"
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
type MessageQueue interface {
// GetName gets the name to locate the configuration in message_queue.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration Configuration) error
+ Initialize(configuration util.Configuration) error
SendMessage(key string, message proto.Message) error
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 61bb6e0ea..7a345abe9 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -11,10 +11,11 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/msgqueue"
_ "github.com/chrislusf/seaweedfs/weed/msgqueue/kafka"
_ "github.com/chrislusf/seaweedfs/weed/msgqueue/log"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+ "github.com/chrislusf/seaweedfs/weed/msgqueue"
)
type FilerOption struct {
@@ -27,7 +28,6 @@ type FilerOption struct {
SecretKey string
DirListingLimit int
DataCenter string
- EnableNotification bool
}
type FilerServer struct {
@@ -49,11 +49,12 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
go fs.filer.KeepConnectedToMaster()
- fs.filer.LoadConfiguration()
+ loadConfiguration("filer", true)
+ v := viper.GetViper()
- if fs.option.EnableNotification {
- msgqueue.LoadConfiguration()
- }
+ fs.filer.LoadConfiguration(v)
+
+ msgqueue.LoadConfiguration(v.Sub("notification"))
defaultMux.HandleFunc("/favicon.ico", faviconHandler)
defaultMux.HandleFunc("/", fs.filerHandler)
@@ -67,3 +68,26 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(fs.secret, fileId)
}
+
+func loadConfiguration(configFileName string, required bool) {
+
+ // find a filer store
+ viper.SetConfigName(configFileName) // name of config file (without extension)
+ viper.AddConfigPath(".") // optionally look for config in the working directory
+ viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
+ viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
+
+ glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
+
+ if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
+ glog.V(0).Infof("Reading %s: %v", configFileName, viper.ConfigFileUsed(), err)
+ if required {
+ glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
+ "\n\nPlease follow this example and add a filer.toml file to "+
+ "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
+ " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n",
+ configFileName, configFileName)
+ }
+ }
+
+}
diff --git a/weed/util/config.go b/weed/util/config.go
index e4549c322..77cab3019 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -1,130 +1,10 @@
package util
-// Copyright 2011 Numerotron Inc.
-// Use of this source code is governed by an MIT-style license
-// that can be found in the LICENSE file.
-//
-// Developed at www.stathat.com by Patrick Crosby
-// Contact us on twitter with any questions: twitter.com/stat_hat
-
-// The jconfig package provides a simple, basic configuration file parser using JSON.
-
-import (
- "bytes"
- "encoding/json"
- "os"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
-)
-
-type Config struct {
- data map[string]interface{}
- filename string
-}
-
-func newConfig() *Config {
- result := new(Config)
- result.data = make(map[string]interface{})
- return result
-}
-
-// Loads config information from a JSON file
-func LoadConfig(filename string) *Config {
- result := newConfig()
- result.filename = filename
- err := result.parse()
- if err != nil {
- glog.Fatalf("error loading config file %s: %s", filename, err)
- }
- return result
-}
-
-// Loads config information from a JSON string
-func LoadConfigString(s string) *Config {
- result := newConfig()
- err := json.Unmarshal([]byte(s), &result.data)
- if err != nil {
- glog.Fatalf("error parsing config string %s: %s", s, err)
- }
- return result
-}
-
-func (c *Config) StringMerge(s string) {
- next := LoadConfigString(s)
- c.merge(next.data)
-}
-
-func (c *Config) LoadMerge(filename string) {
- next := LoadConfig(filename)
- c.merge(next.data)
-}
-
-func (c *Config) merge(ndata map[string]interface{}) {
- for k, v := range ndata {
- c.data[k] = v
- }
-}
-
-func (c *Config) parse() error {
- f, err := os.Open(c.filename)
- if err != nil {
- return err
- }
- defer f.Close()
- b := new(bytes.Buffer)
- _, err = b.ReadFrom(f)
- if err != nil {
- return err
- }
- err = json.Unmarshal(b.Bytes(), &c.data)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// Returns a string for the config variable key
-func (c *Config) GetString(key string) string {
- result, present := c.data[key]
- if !present {
- return ""
- }
- return result.(string)
-}
-
-// Returns an int for the config variable key
-func (c *Config) GetInt(key string) int {
- x, ok := c.data[key]
- if !ok {
- return -1
- }
- return int(x.(float64))
-}
-
-// Returns a float for the config variable key
-func (c *Config) GetFloat(key string) float64 {
- x, ok := c.data[key]
- if !ok {
- return -1
- }
- return x.(float64)
-}
-
-// Returns a bool for the config variable key
-func (c *Config) GetBool(key string) bool {
- x, ok := c.data[key]
- if !ok {
- return false
- }
- return x.(bool)
-}
-
-// Returns an array for the config variable key
-func (c *Config) GetArray(key string) []interface{} {
- result, present := c.data[key]
- if !present {
- return []interface{}(nil)
- }
- return result.([]interface{})
+type Configuration interface {
+ GetString(key string) string
+ GetBool(key string) bool
+ GetInt(key string) int
+ GetInt64(key string) int64
+ GetFloat64(key string) float64
+ GetStringSlice(key string) []string
}