diff options
Diffstat (limited to 'weed/server/filer_server.go')
| -rw-r--r-- | weed/server/filer_server.go | 157 |
1 files changed, 115 insertions, 42 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 9d70e4dac..c6ab6ef0f 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,112 +1,185 @@ package weed_server import ( + "context" + "fmt" "net/http" "os" + "strings" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" + _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" + _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/log" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" ) type FilerOption struct { Masters []string Collection string DefaultReplication string - RedirectOnRead bool DisableDirListing bool MaxMB int - SecretKey string DirListingLimit int DataCenter string DefaultLevelDbDir string + DisableHttp bool + Host string + Port uint32 + recursiveDelete bool + Cipher bool + Filers []string } type FilerServer struct { - option *FilerOption - secret security.Secret - filer *filer2.Filer + option *FilerOption + secret security.SigningKey + filer *filer2.Filer + metaAggregator *filer2.MetaAggregator + grpcDialOption grpc.DialOption + + // notifying clients + listenersLock sync.Mutex + listenersCond *sync.Cond + + brokers map[string]map[string]bool + brokersLock sync.Mutex } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ - option: option, + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), + brokers: make(map[string]map[string]bool), } + fs.listenersCond = sync.NewCond(&fs.listenersLock) if len(option.Masters) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { + fs.listenersCond.Broadcast() + }) + fs.filer.Cipher = option.Cipher + + maybeStartMetrics(fs, option) go fs.filer.KeepConnectedToMaster() - v := viper.GetViper() - if !LoadConfiguration("filer", false) { - v.Set("leveldb.enabled", true) - v.Set("leveldb.dir", option.DefaultLevelDbDir) + v := util.GetViper() + if !util.LoadConfiguration("filer", false) { + v.Set("leveldb2.enabled", true) + v.Set("leveldb2.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) } + glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir) } - LoadConfiguration("notification", false) + util.LoadConfiguration("notification", false) + fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") + v.SetDefault("filer.options.buckets_folder", "/buckets") + fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") + fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") fs.filer.LoadConfiguration(v) - notification.LoadConfiguration(v.Sub("notification")) + notification.LoadConfiguration(v, "notification.") handleStaticResources(defaultMux) - defaultMux.HandleFunc("/", fs.filerHandler) + if !option.DisableHttp { + defaultMux.HandleFunc("/", fs.filerHandler) + } if defaultMux != readonlyMux { readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } + // set peers + if strings.HasPrefix(fs.filer.GetStore().GetName(), "leveldb") && len(option.Filers) > 0 { + glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(option.Filers), option.Filers) + } + if len(option.Filers) == 0 { + option.Filers = append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port)) + } + fs.metaAggregator = filer2.NewMetaAggregator(option.Filers, fs.grpcDialOption) + fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano()) + + fs.filer.LoadBuckets() + + grace.OnInterrupt(func() { + fs.filer.Shutdown() + }) + return fs, nil } -func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(fs.secret, fileId) -} +func maybeStartMetrics(fs *FilerServer, option *FilerOption) { -func LoadConfiguration(configFileName string, required bool) (loaded bool) { - - // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in - - glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) - - if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file - glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) - if required { - glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ - "\n\nPlease follow this example and add a filer.toml file to "+ - "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ - " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ - "\nOr use this command to generate the default toml file\n"+ - " weed scaffold -config=%s -output=.\n\n\n", - configFileName, configFileName, configFileName) - } else { - return false + for _, master := range option.Masters { + _, err := pb.ParseFilerGrpcAddress(master) + if err != nil { + glog.Fatalf("invalid master address %s: %v", master, err) } } - return true + isConnected := false + var metricsAddress string + var metricsIntervalSec int + var readErr error + for !isConnected { + for _, master := range option.Masters { + metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) + if readErr == nil { + isConnected = true + } else { + time.Sleep(7 * time.Second) + } + } + } + if metricsAddress == "" && metricsIntervalSec <= 0 { + return + } + go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, + func() (addr string, intervalSeconds int) { + return metricsAddress, metricsIntervalSec + }) +} +func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { + err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", masterAddress, err) + } + metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + return nil + }) + return } |
