aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server.go')
-rw-r--r--weed/server/filer_server.go65
1 files changed, 43 insertions, 22 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 534bc4840..6bf0261ee 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -16,10 +16,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
@@ -34,7 +36,9 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/ydb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
@@ -46,7 +50,8 @@ import (
)
type FilerOption struct {
- Masters []string
+ Masters map[string]pb.ServerAddress
+ FilerGroup string
Collection string
DefaultReplication string
DisableDirListing bool
@@ -54,21 +59,23 @@ type FilerOption struct {
DirListingLimit int
DataCenter string
Rack string
+ DataNode string
DefaultLevelDbDir string
DisableHttp bool
- Host string
- Port uint32
+ Host pb.ServerAddress
recursiveDelete bool
Cipher bool
SaveToFilerLimit int64
- Filers []string
ConcurrentUploadLimit int64
+ ShowUIDirectoryDelete bool
}
type FilerServer struct {
+ filer_pb.UnimplementedSeaweedFilerServer
option *FilerOption
secret security.SigningKey
filer *filer.Filer
+ filerGuard *security.Guard
grpcDialOption grpc.DialOption
// metrics read from the master
@@ -79,6 +86,10 @@ type FilerServer struct {
listenersLock sync.Mutex
listenersCond *sync.Cond
+ // track known metadata listeners
+ knownListenersLock sync.Mutex
+ knownListeners map[int32]struct{}
+
brokers map[string]map[string]bool
brokersLock sync.Mutex
@@ -88,9 +99,19 @@ type FilerServer struct {
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
+ v := util.GetViper()
+ signingKey := v.GetString("jwt.filer_signing.key")
+ v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
+
+ readSigningKey := v.GetString("jwt.filer_signing.read.key")
+ v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
+
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ knownListeners: make(map[int32]struct{}),
brokers: make(map[string]map[string]bool),
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
@@ -100,20 +121,21 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
+ // we do not support IP whitelist right now
+ fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
fs.checkWithMaster()
- go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
- go fs.filer.KeepConnectedToMaster()
+ go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
+ go fs.filer.KeepMasterClientConnected()
- v := util.GetViper()
if !util.LoadConfiguration("filer", false) {
- v.Set("leveldb2.enabled", true)
- v.Set("leveldb2.dir", option.DefaultLevelDbDir)
+ v.SetDefault("leveldb2.enabled", true)
+ v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
@@ -130,7 +152,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
// TODO deprecated, will be be removed after 2020-12-31
// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
- fs.filer.LoadConfiguration(v)
+ isFresh := fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
@@ -143,9 +165,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
-
- fs.filer.LoadBuckets()
+ existingNodes := fs.filer.ListExistingPeerUpdates()
+ startFromTime := time.Now().Add(-filer.LogFlushInterval)
+ if isFresh {
+ glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
+ if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
+ glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes)
+ }
+ }
+ fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
fs.filer.LoadFilerConf()
@@ -160,17 +188,10 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
- for _, master := range fs.option.Masters {
- _, err := pb.ParseServerToGrpcAddress(master)
- if err != nil {
- glog.Fatalf("invalid master address %s: %v", master, err)
- }
- }
-
isConnected := false
for !isConnected {
for _, master := range fs.option.Masters {
- readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)