aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server.go')
-rw-r--r--weed/server/filer_server.go157
1 files changed, 115 insertions, 42 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 9d70e4dac..c6ab6ef0f 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,112 +1,185 @@
package weed_server
import (
+ "context"
+ "fmt"
"net/http"
"os"
+ "strings"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type FilerOption struct {
Masters []string
Collection string
DefaultReplication string
- RedirectOnRead bool
DisableDirListing bool
MaxMB int
- SecretKey string
DirListingLimit int
DataCenter string
DefaultLevelDbDir string
+ DisableHttp bool
+ Host string
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
+ Filers []string
}
type FilerServer struct {
- option *FilerOption
- secret security.Secret
- filer *filer2.Filer
+ option *FilerOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ metaAggregator *filer2.MetaAggregator
+ grpcDialOption grpc.DialOption
+
+ // notifying clients
+ listenersLock sync.Mutex
+ listenersCond *sync.Cond
+
+ brokers map[string]map[string]bool
+ brokersLock sync.Mutex
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
}
+ fs.listenersCond = sync.NewCond(&fs.listenersLock)
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
+ fs.listenersCond.Broadcast()
+ })
+ fs.filer.Cipher = option.Cipher
+
+ maybeStartMetrics(fs, option)
go fs.filer.KeepConnectedToMaster()
- v := viper.GetViper()
- if !LoadConfiguration("filer", false) {
- v.Set("leveldb.enabled", true)
- v.Set("leveldb.dir", option.DefaultLevelDbDir)
+ v := util.GetViper()
+ if !util.LoadConfiguration("filer", false) {
+ v.Set("leveldb2.enabled", true)
+ v.Set("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
+ glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
}
- LoadConfiguration("notification", false)
+ util.LoadConfiguration("notification", false)
+ fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
+ v.SetDefault("filer.options.buckets_folder", "/buckets")
+ fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
+ fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
- notification.LoadConfiguration(v.Sub("notification"))
+ notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
- defaultMux.HandleFunc("/", fs.filerHandler)
+ if !option.DisableHttp {
+ defaultMux.HandleFunc("/", fs.filerHandler)
+ }
if defaultMux != readonlyMux {
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
+ // set peers
+ if strings.HasPrefix(fs.filer.GetStore().GetName(), "leveldb") && len(option.Filers) > 0 {
+ glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(option.Filers), option.Filers)
+ }
+ if len(option.Filers) == 0 {
+ option.Filers = append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port))
+ }
+ fs.metaAggregator = filer2.NewMetaAggregator(option.Filers, fs.grpcDialOption)
+ fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano())
+
+ fs.filer.LoadBuckets()
+
+ grace.OnInterrupt(func() {
+ fs.filer.Shutdown()
+ })
+
return fs, nil
}
-func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
- return security.GenJwt(fs.secret, fileId)
-}
+func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
-func LoadConfiguration(configFileName string, required bool) (loaded bool) {
-
- // find a filer store
- viper.SetConfigName(configFileName) // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
-
- glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
-
- if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
- glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
- if required {
- glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
- "\n\nPlease follow this example and add a filer.toml file to "+
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
- " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+
- "\nOr use this command to generate the default toml file\n"+
- " weed scaffold -config=%s -output=.\n\n\n",
- configFileName, configFileName, configFileName)
- } else {
- return false
+ for _, master := range option.Masters {
+ _, err := pb.ParseFilerGrpcAddress(master)
+ if err != nil {
+ glog.Fatalf("invalid master address %s: %v", master, err)
}
}
- return true
+ isConnected := false
+ var metricsAddress string
+ var metricsIntervalSec int
+ var readErr error
+ for !isConnected {
+ for _, master := range option.Masters {
+ metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
+ if readErr == nil {
+ isConnected = true
+ } else {
+ time.Sleep(7 * time.Second)
+ }
+ }
+ }
+ if metricsAddress == "" && metricsIntervalSec <= 0 {
+ return
+ }
+ go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
+ func() (addr string, intervalSeconds int) {
+ return metricsAddress, metricsIntervalSec
+ })
+}
+func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
+ err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
+ }
+ metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ return nil
+ })
+ return
}