diff options
Diffstat (limited to 'weed/server/filer_server.go')
| -rw-r--r-- | weed/server/filer_server.go | 73 |
1 files changed, 34 insertions, 39 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 6995c7cfe..59c149cef 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "net/http" "os" "sync" @@ -15,19 +16,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/filer2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" - _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" - _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" - _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" - _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" - _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2" + "github.com/chrislusf/seaweedfs/weed/filer" + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" @@ -58,9 +59,13 @@ type FilerOption struct { type FilerServer struct { option *FilerOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption + // metrics read from the master + metricsAddress string + metricsIntervalSec int + // notifying clients listenersLock sync.Mutex listenersCond *sync.Cond @@ -82,13 +87,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher - maybeStartMetrics(fs, option) + fs.checkWithMaster() + go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepConnectedToMaster() v := util.GetViper() @@ -130,9 +136,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) return fs, nil } -func maybeStartMetrics(fs *FilerServer, option *FilerOption) { +func (fs *FilerServer) checkWithMaster() { - for _, master := range option.Masters { + for _, master := range fs.option.Masters { _, err := pb.ParseFilerGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) @@ -140,12 +146,19 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { } isConnected := false - var metricsAddress string - var metricsIntervalSec int - var readErr error for !isConnected { - for _, master := range option.Masters { - metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) + for _, master := range fs.option.Masters { + readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", master, err) + } + fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + if fs.option.DefaultReplication == "" { + fs.option.DefaultReplication = resp.DefaultReplication + } + return nil + }) if readErr == nil { isConnected = true } else { @@ -153,23 +166,5 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { } } } - if metricsAddress == "" && metricsIntervalSec <= 0 { - return - } - go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, - func() (addr string, intervalSeconds int) { - return metricsAddress, metricsIntervalSec - }) -} -func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { - err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get master %s configuration: %v", masterAddress, err) - } - metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) - return nil - }) - return } |
