diff options
Diffstat (limited to 'weed/server/filer_server.go')
| -rw-r--r-- | weed/server/filer_server.go | 118 |
1 files changed, 81 insertions, 37 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2aabb9932..656bb2ed8 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,96 +1,140 @@ package weed_server import ( + "context" + "fmt" "net/http" + "os" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" + _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" + _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" + _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/log" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" ) type FilerOption struct { Masters []string Collection string DefaultReplication string - RedirectOnRead bool DisableDirListing bool MaxMB int - SecretKey string DirListingLimit int DataCenter string + DefaultLevelDbDir string + DisableHttp bool + Port uint32 + recursiveDelete bool + Cipher bool } type FilerServer struct { - option *FilerOption - secret security.Secret - filer *filer2.Filer + option *FilerOption + secret security.SigningKey + filer *filer2.Filer + grpcDialOption grpc.DialOption } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ - option: option, + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), } if len(option.Masters) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000) + fs.filer.Cipher = option.Cipher go fs.filer.KeepConnectedToMaster() - LoadConfiguration("filer", true) - v := viper.GetViper() + v := util.GetViper() + if !util.LoadConfiguration("filer", false) { + v.Set("leveldb2.enabled", true) + v.Set("leveldb2.dir", option.DefaultLevelDbDir) + _, err := os.Stat(option.DefaultLevelDbDir) + if os.IsNotExist(err) { + os.MkdirAll(option.DefaultLevelDbDir, 0755) + } + } + util.LoadConfiguration("notification", false) + fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") + v.Set("filer.option.buckets_folder", "/buckets") + v.Set("filer.option.queues_folder", "/queues") + fs.filer.DirBucketsPath = v.GetString("filer.option.buckets_folder") + fs.filer.DirQueuesPath = v.GetString("filer.option.queues_folder") fs.filer.LoadConfiguration(v) - notification.LoadConfiguration(v.Sub("notification")) + notification.LoadConfiguration(v, "notification.") handleStaticResources(defaultMux) - defaultMux.HandleFunc("/", fs.filerHandler) + if !option.DisableHttp { + defaultMux.HandleFunc("/", fs.filerHandler) + } if defaultMux != readonlyMux { readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - return fs, nil -} + fs.filer.LoadBuckets(fs.filer.DirBucketsPath) + + maybeStartMetrics(fs, option) -func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(fs.secret, fileId) + return fs, nil } -func LoadConfiguration(configFileName string, required bool) { - - // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in - - glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) - - if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file - glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) - if required { - glog.Errorf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ - "\n\nPlease follow this example and add a filer.toml file to "+ - "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ - " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ - "\n\nOr use this command to generate the default toml file\n"+ - " weed scaffold -config=%s -output=.\n", - configFileName, configFileName, configFileName) +func maybeStartMetrics(fs *FilerServer, option *FilerOption) { + isConnected := false + var metricsAddress string + var metricsIntervalSec int + var readErr error + for !isConnected { + metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0]) + if readErr == nil { + isConnected = true + } else { + time.Sleep(7 * time.Second) } } + if metricsAddress == "" && metricsIntervalSec <= 0 { + return + } + go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, + func() (addr string, intervalSeconds int) { + return metricsAddress, metricsIntervalSec + }) +} +func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) { + err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err) + } + metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + return nil + }) + return } |
