diff options
Diffstat (limited to 'weed/server/filer_server.go')
| -rw-r--r-- | weed/server/filer_server.go | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 534bc4840..6bf0261ee 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -16,10 +16,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/filer" + _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb" _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" @@ -34,7 +36,9 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" + _ "github.com/chrislusf/seaweedfs/weed/filer/ydb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" @@ -46,7 +50,8 @@ import ( ) type FilerOption struct { - Masters []string + Masters map[string]pb.ServerAddress + FilerGroup string Collection string DefaultReplication string DisableDirListing bool @@ -54,21 +59,23 @@ type FilerOption struct { DirListingLimit int DataCenter string Rack string + DataNode string DefaultLevelDbDir string DisableHttp bool - Host string - Port uint32 + Host pb.ServerAddress recursiveDelete bool Cipher bool SaveToFilerLimit int64 - Filers []string ConcurrentUploadLimit int64 + ShowUIDirectoryDelete bool } type FilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer option *FilerOption secret security.SigningKey filer *filer.Filer + filerGuard *security.Guard grpcDialOption grpc.DialOption // metrics read from the master @@ -79,6 +86,10 @@ type FilerServer struct { listenersLock sync.Mutex listenersCond *sync.Cond + // track known metadata listeners + knownListenersLock sync.Mutex + knownListeners map[int32]struct{} + brokers map[string]map[string]bool brokersLock sync.Mutex @@ -88,9 +99,19 @@ type FilerServer struct { func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { + v := util.GetViper() + signingKey := v.GetString("jwt.filer_signing.key") + v.SetDefault("jwt.filer_signing.expires_after_seconds", 10) + expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds") + + readSigningKey := v.GetString("jwt.filer_signing.read.key") + v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) + readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") + fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), + knownListeners: make(map[int32]struct{}), brokers: make(map[string]map[string]bool), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } @@ -100,20 +121,21 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher + // we do not support IP whitelist right now + fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) fs.checkWithMaster() - go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) - go fs.filer.KeepConnectedToMaster() + go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) + go fs.filer.KeepMasterClientConnected() - v := util.GetViper() if !util.LoadConfiguration("filer", false) { - v.Set("leveldb2.enabled", true) - v.Set("leveldb2.dir", option.DefaultLevelDbDir) + v.SetDefault("leveldb2.enabled", true) + v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) @@ -130,7 +152,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) // TODO deprecated, will be be removed after 2020-12-31 // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") - fs.filer.LoadConfiguration(v) + isFresh := fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v, "notification.") @@ -143,9 +165,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers) - - fs.filer.LoadBuckets() + existingNodes := fs.filer.ListExistingPeerUpdates() + startFromTime := time.Now().Add(-filer.LogFlushInterval) + if isFresh { + glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes) + if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil { + glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes) + } + } + fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime) fs.filer.LoadFilerConf() @@ -160,17 +188,10 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) func (fs *FilerServer) checkWithMaster() { - for _, master := range fs.option.Masters { - _, err := pb.ParseServerToGrpcAddress(master) - if err != nil { - glog.Fatalf("invalid master address %s: %v", master, err) - } - } - isConnected := false for !isConnected { for _, master := range fs.option.Masters { - readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) |
