aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server.go')
-rw-r--r--weed/server/filer_server.go118
1 files changed, 81 insertions, 37 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 2aabb9932..656bb2ed8 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,96 +1,140 @@
package weed_server
import (
+ "context"
+ "fmt"
"net/http"
+ "os"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type FilerOption struct {
Masters []string
Collection string
DefaultReplication string
- RedirectOnRead bool
DisableDirListing bool
MaxMB int
- SecretKey string
DirListingLimit int
DataCenter string
+ DefaultLevelDbDir string
+ DisableHttp bool
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
}
type FilerServer struct {
- option *FilerOption
- secret security.Secret
- filer *filer2.Filer
+ option *FilerOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
}
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000)
+ fs.filer.Cipher = option.Cipher
go fs.filer.KeepConnectedToMaster()
- LoadConfiguration("filer", true)
- v := viper.GetViper()
+ v := util.GetViper()
+ if !util.LoadConfiguration("filer", false) {
+ v.Set("leveldb2.enabled", true)
+ v.Set("leveldb2.dir", option.DefaultLevelDbDir)
+ _, err := os.Stat(option.DefaultLevelDbDir)
+ if os.IsNotExist(err) {
+ os.MkdirAll(option.DefaultLevelDbDir, 0755)
+ }
+ }
+ util.LoadConfiguration("notification", false)
+ fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
+ v.Set("filer.option.buckets_folder", "/buckets")
+ v.Set("filer.option.queues_folder", "/queues")
+ fs.filer.DirBucketsPath = v.GetString("filer.option.buckets_folder")
+ fs.filer.DirQueuesPath = v.GetString("filer.option.queues_folder")
fs.filer.LoadConfiguration(v)
- notification.LoadConfiguration(v.Sub("notification"))
+ notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
- defaultMux.HandleFunc("/", fs.filerHandler)
+ if !option.DisableHttp {
+ defaultMux.HandleFunc("/", fs.filerHandler)
+ }
if defaultMux != readonlyMux {
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- return fs, nil
-}
+ fs.filer.LoadBuckets(fs.filer.DirBucketsPath)
+
+ maybeStartMetrics(fs, option)
-func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
- return security.GenJwt(fs.secret, fileId)
+ return fs, nil
}
-func LoadConfiguration(configFileName string, required bool) {
-
- // find a filer store
- viper.SetConfigName(configFileName) // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
-
- glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
-
- if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file
- glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
- if required {
- glog.Errorf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
- "\n\nPlease follow this example and add a filer.toml file to "+
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
- " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+
- "\n\nOr use this command to generate the default toml file\n"+
- " weed scaffold -config=%s -output=.\n",
- configFileName, configFileName, configFileName)
+func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
+ isConnected := false
+ var metricsAddress string
+ var metricsIntervalSec int
+ var readErr error
+ for !isConnected {
+ metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
+ if readErr == nil {
+ isConnected = true
+ } else {
+ time.Sleep(7 * time.Second)
}
}
+ if metricsAddress == "" && metricsIntervalSec <= 0 {
+ return
+ }
+ go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
+ func() (addr string, intervalSeconds int) {
+ return metricsAddress, metricsIntervalSec
+ })
+}
+func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
+ err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
+ }
+ metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ return nil
+ })
+ return
}