aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server.go')
-rw-r--r--weed/server/filer_server.go171
1 files changed, 114 insertions, 57 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 41ba81366..2734223ea 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -5,23 +5,35 @@ import (
"fmt"
"net/http"
"os"
+ "sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/tikv"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
@@ -30,46 +42,74 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type FilerOption struct {
- Masters []string
- Collection string
- DefaultReplication string
- RedirectOnRead bool
- DisableDirListing bool
- MaxMB int
- DirListingLimit int
- DataCenter string
- DefaultLevelDbDir string
- DisableHttp bool
- Port int
+ Masters []string
+ Collection string
+ DefaultReplication string
+ DisableDirListing bool
+ MaxMB int
+ DirListingLimit int
+ DataCenter string
+ Rack string
+ DefaultLevelDbDir string
+ DisableHttp bool
+ Host string
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
+ SaveToFilerLimit int64
+ Filers []string
+ ConcurrentUploadLimit int64
}
type FilerServer struct {
option *FilerOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
+
+ // metrics read from the master
+ metricsAddress string
+ metricsIntervalSec int
+
+ // notifying clients
+ listenersLock sync.Mutex
+ listenersCond *sync.Cond
+
+ brokers map[string]map[string]bool
+ brokersLock sync.Mutex
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
+ fs.listenersCond = sync.NewCond(&fs.listenersLock)
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
+ fs.listenersCond.Broadcast()
+ })
+ fs.filer.Cipher = option.Cipher
+
+ fs.checkWithMaster()
+ go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepConnectedToMaster()
- v := viper.GetViper()
+ v := util.GetViper()
if !util.LoadConfiguration("filer", false) {
v.Set("leveldb2.enabled", true)
v.Set("leveldb2.dir", option.DefaultLevelDbDir)
@@ -77,56 +117,73 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
+ glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
+ } else {
+ glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
+ fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
+ v.SetDefault("filer.options.buckets_folder", "/buckets")
+ fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
+ // TODO deprecated, will be be removed after 2020-12-31
+ // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
+ fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
- notification.LoadConfiguration(v.Sub("notification"))
+ notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
if !option.DisableHttp {
defaultMux.HandleFunc("/", fs.filerHandler)
}
if defaultMux != readonlyMux {
+ handleStaticResources(readonlyMux)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- maybeStartMetrics(fs, option)
+ fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
+
+ fs.filer.LoadBuckets()
+
+ fs.filer.LoadFilerConf()
+
+ grace.OnInterrupt(func() {
+ fs.filer.Shutdown()
+ })
return fs, nil
}
-func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
+func (fs *FilerServer) checkWithMaster() {
+
+ for _, master := range fs.option.Masters {
+ _, err := pb.ParseServerToGrpcAddress(master)
+ if err != nil {
+ glog.Fatalf("invalid master address %s: %v", master, err)
+ }
+ }
+
isConnected := false
- var metricsAddress string
- var metricsIntervalSec int
- var readErr error
for !isConnected {
- metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
- if readErr == nil {
- isConnected = true
- } else {
- time.Sleep(7 * time.Second)
+ for _, master := range fs.option.Masters {
+ readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ if fs.option.DefaultReplication == "" {
+ fs.option.DefaultReplication = resp.DefaultReplication
+ }
+ return nil
+ })
+ if readErr == nil {
+ isConnected = true
+ } else {
+ time.Sleep(7 * time.Second)
+ }
}
}
- if metricsAddress == "" && metricsIntervalSec <= 0 {
- return
- }
- go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
- func() (addr string, intervalSeconds int) {
- return metricsAddress, metricsIntervalSec
- })
-}
-func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
- err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
- resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
- }
- metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
- return nil
- })
- return
}