aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/filer_conf.go60
-rw-r--r--weed/filer/filer_conf_test.go6
-rw-r--r--weed/filer/filer_on_meta_event.go31
-rw-r--r--weed/server/filer_server.go2
5 files changed, 77 insertions, 24 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 0c4d610c5..105c8e04f 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -41,6 +41,7 @@ type Filer struct {
metaLogReplication string
MetaAggregator *MetaAggregator
Signature int32
+ FilerConf *FilerConf
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
@@ -49,6 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
+ FilerConf: NewFilerConf(),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 079794def..737b1aeea 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -1,48 +1,92 @@
package filer
import (
+ "context"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"github.com/viant/ptrie"
)
+
+const (
+ DirectoryEtc = "/etc"
+ FilerConfName = "filer.conf"
+)
+
type FilerConf struct {
rules ptrie.Trie
}
-func NewFilerConf(data []byte) (fc *FilerConf) {
+func NewFilerConf() (fc *FilerConf) {
fc = &FilerConf{
rules: ptrie.New(),
}
+ return fc
+}
+
+func (fc *FilerConf) loadFromFiler(filer *Filer) (err error){
+ filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName)
+ entry, err := filer.FindEntry(context.Background(), filerConfPath)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ return nil
+ }
+ glog.Errorf("read filer conf entry %s: %v", filerConfPath, err)
+ return
+ }
+
+ return fc.loadFromChunks(filer, entry.Chunks)
+}
+
+func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) {
+ data, err := filer.readEntry(chunks)
+ if err != nil {
+ glog.Errorf("read filer conf content: %v", err)
+ return
+ }
+
+ return fc.loadFromBytes(data)
+}
+
+func (fc *FilerConf) loadFromBytes(data []byte) (err error) {
conf := &filer_pb.FilerConf{}
- err := proto.UnmarshalText(string(data), conf)
+ err = proto.UnmarshalText(string(data), conf)
if err != nil {
glog.Errorf("unable to parse filer conf: %v", err)
- return
+ // this is not recoverable
+ return nil
}
- fc.doLoadConf(conf)
- return fc
+ return fc.doLoadConf(conf)
}
-func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) {
+func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) {
for _, location := range conf.Locations {
- err := fc.rules.Put([]byte(location.LocationPrefix), location)
+ err = fc.rules.Put([]byte(location.LocationPrefix), location)
if err != nil {
glog.Errorf("put location prefix: %v", err)
+ // this is not recoverable
+ return nil
}
}
+ return nil
}
+var (
+ EmptyFilerConfPathConf = &filer_pb.FilerConf_PathConf{}
+)
+
func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf){
fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
pathConf = value.(*filer_pb.FilerConf_PathConf)
return true
})
if pathConf == nil {
- return &filer_pb.FilerConf_PathConf{}
+ return EmptyFilerConfPathConf
}
return pathConf
}
diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go
index 6a7b875d3..1bfe8bcfe 100644
--- a/weed/filer/filer_conf_test.go
+++ b/weed/filer/filer_conf_test.go
@@ -5,14 +5,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/stretchr/testify/assert"
- "github.com/viant/ptrie"
)
func TestFilerConf(t *testing.T) {
- fc := &FilerConf{
- rules: ptrie.New(),
- }
+ fc := NewFilerConf()
+
conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{
{
LocationPrefix: "/buckets/abc",
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index f0b64f4e2..3de27da6e 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -6,10 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-const (
- DirectoryEtc = "/etc"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
@@ -26,15 +23,15 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse)
}
glog.V(0).Infof("procesing %v", event)
- if entry.Name == "filer.conf" {
+ if entry.Name == FilerConfName {
f.reloadFilerConfiguration(entry)
}
}
-func (f *Filer) readEntry(entry *filer_pb.Entry) ([]byte, error){
+func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
var buf bytes.Buffer
- err := StreamContent(f.MasterClient, &buf, entry.Chunks, 0, math.MaxInt64)
+ err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64)
if err != nil {
return nil, err
}
@@ -42,13 +39,23 @@ func (f *Filer) readEntry(entry *filer_pb.Entry) ([]byte, error){
}
func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) {
- data, err := f.readEntry(entry)
+ fc := NewFilerConf()
+ err := fc.loadFromChunks(f, entry.Chunks)
if err != nil {
- glog.Warningf("read entry %s: %v", entry.Name, err)
+ glog.Errorf("read filer conf chunks: %v", err)
return
-
}
+ f.FilerConf = fc
+}
- println(string(data))
-
+func (f *Filer) LoadFilerConf() {
+ fc := NewFilerConf()
+ err := util.Retry("loadFilerConf", func() error {
+ return fc.loadFromFiler(f)
+ })
+ if err != nil {
+ glog.Errorf("read filer conf: %v", err)
+ return
+ }
+ f.FilerConf = fc
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index dc93ae062..ba2d9989c 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -131,6 +131,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.LoadBuckets()
+ fs.filer.LoadFilerConf()
+
grace.OnInterrupt(func() {
fs.filer.Shutdown()
})