aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/filer_conf.go139
-rw-r--r--weed/filer/filer_conf_test.go29
-rw-r--r--weed/filer/filer_notify_append.go10
-rw-r--r--weed/filer/filer_on_meta_event.go61
-rw-r--r--weed/filer/meta_aggregator.go3
6 files changed, 240 insertions, 4 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 0c4d610c5..105c8e04f 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -41,6 +41,7 @@ type Filer struct {
metaLogReplication string
MetaAggregator *MetaAggregator
Signature int32
+ FilerConf *FilerConf
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
@@ -49,6 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
+ FilerConf: NewFilerConf(),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
new file mode 100644
index 000000000..182449d49
--- /dev/null
+++ b/weed/filer/filer_conf.go
@@ -0,0 +1,139 @@
+package filer
+
+import (
+ "bytes"
+ "context"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
+ "github.com/viant/ptrie"
+)
+
+const (
+ DirectoryEtc = "/etc"
+ FilerConfName = "filer.conf"
+)
+
+type FilerConf struct {
+ rules ptrie.Trie
+}
+
+func NewFilerConf() (fc *FilerConf) {
+ fc = &FilerConf{
+ rules: ptrie.New(),
+ }
+ return fc
+}
+
+func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) {
+ filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName)
+ entry, err := filer.FindEntry(context.Background(), filerConfPath)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ return nil
+ }
+ glog.Errorf("read filer conf entry %s: %v", filerConfPath, err)
+ return
+ }
+
+ return fc.loadFromChunks(filer, entry.Chunks)
+}
+
+func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) {
+ data, err := filer.readEntry(chunks)
+ if err != nil {
+ glog.Errorf("read filer conf content: %v", err)
+ return
+ }
+
+ return fc.LoadFromBytes(data)
+}
+
+func (fc *FilerConf) LoadFromBytes(data []byte) (err error) {
+ conf := &filer_pb.FilerConf{}
+
+ if err := jsonpb.Unmarshal(bytes.NewReader(data), conf); err != nil {
+
+ err = proto.UnmarshalText(string(data), conf)
+ if err != nil {
+ glog.Errorf("unable to parse filer conf: %v", err)
+ // this is not recoverable
+ return nil
+ }
+
+ return nil
+ }
+
+ return fc.doLoadConf(conf)
+}
+
+func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) {
+ for _, location := range conf.Locations {
+ err = fc.AddLocationConf(location)
+ if err != nil {
+ // this is not recoverable
+ return nil
+ }
+ }
+ return nil
+}
+
+func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error) {
+ err = fc.rules.Put([]byte(locConf.LocationPrefix), locConf)
+ if err != nil {
+ glog.Errorf("put location prefix: %v", err)
+ }
+ return
+}
+
+func (fc *FilerConf) DeleteLocationConf(locationPrefix string) {
+ rules := ptrie.New()
+ fc.rules.Walk(func(key []byte, value interface{}) bool {
+ if string(key) == locationPrefix {
+ return true
+ }
+ rules.Put(key, value)
+ return true
+ })
+ fc.rules = rules
+ return
+}
+
+var (
+ EmptyFilerConfPathConf = &filer_pb.FilerConf_PathConf{}
+)
+
+func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf) {
+ fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
+ pathConf = value.(*filer_pb.FilerConf_PathConf)
+ return true
+ })
+ if pathConf == nil {
+ return EmptyFilerConfPathConf
+ }
+ return pathConf
+}
+
+func (fc *FilerConf) ToProto() *filer_pb.FilerConf {
+ m := &filer_pb.FilerConf{}
+ fc.rules.Walk(func(key []byte, value interface{}) bool {
+ pathConf := value.(*filer_pb.FilerConf_PathConf)
+ m.Locations = append(m.Locations, pathConf)
+ return true
+ })
+ return m
+}
+
+func (fc *FilerConf) ToText(writer io.Writer) error {
+
+ m := jsonpb.Marshaler{
+ EmitDefaults: false,
+ Indent: " ",
+ }
+
+ return m.Marshal(writer, fc.ToProto())
+}
diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go
new file mode 100644
index 000000000..1bfe8bcfe
--- /dev/null
+++ b/weed/filer/filer_conf_test.go
@@ -0,0 +1,29 @@
+package filer
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestFilerConf(t *testing.T) {
+
+ fc := NewFilerConf()
+
+ conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{
+ {
+ LocationPrefix: "/buckets/abc",
+ Collection: "abc",
+ },
+ {
+ LocationPrefix: "/buckets/abcd",
+ Collection: "abcd",
+ },
+ }}
+ fc.doLoadConf(conf)
+
+ assert.Equal(t, "abc", fc.MatchStorageRule("/buckets/abc/jasdf").Collection)
+ assert.Equal(t, "abcd", fc.MatchStorageRule("/buckets/abcd/jasdf").Collection)
+
+}
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index b1836b046..19da97f6c 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -13,7 +13,7 @@ import (
func (f *Filer) appendToFile(targetFile string, data []byte) error {
- assignResult, uploadResult, err2 := f.assignAndUpload(data)
+ assignResult, uploadResult, err2 := f.assignAndUpload(targetFile, data)
if err2 != nil {
return err2
}
@@ -46,14 +46,16 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
return err
}
-func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
// assign a volume location
+ rule := f.FilerConf.MatchStorageRule(targetFile)
assignRequest := &operation.VolumeAssignRequest{
Count: 1,
- Collection: f.metaLogCollection,
- Replication: f.metaLogReplication,
+ Collection: util.Nvl(f.metaLogCollection, rule.Collection),
+ Replication: util.Nvl(f.metaLogReplication, rule.Replication),
WritableVolumeCount: 1,
}
+
assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
if err != nil {
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
new file mode 100644
index 000000000..3de27da6e
--- /dev/null
+++ b/weed/filer/filer_on_meta_event.go
@@ -0,0 +1,61 @@
+package filer
+
+import (
+ "bytes"
+ "math"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
+func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
+ if DirectoryEtc != event.Directory {
+ if DirectoryEtc != event.EventNotification.NewParentPath {
+ return
+ }
+ }
+
+ entry := event.EventNotification.NewEntry
+ if entry == nil {
+ return
+ }
+
+ glog.V(0).Infof("procesing %v", event)
+ if entry.Name == FilerConfName {
+ f.reloadFilerConfiguration(entry)
+ }
+
+}
+
+func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
+ var buf bytes.Buffer
+ err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64)
+ if err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) {
+ fc := NewFilerConf()
+ err := fc.loadFromChunks(f, entry.Chunks)
+ if err != nil {
+ glog.Errorf("read filer conf chunks: %v", err)
+ return
+ }
+ f.FilerConf = fc
+}
+
+func (f *Filer) LoadFilerConf() {
+ fc := NewFilerConf()
+ err := util.Retry("loadFilerConf", func() error {
+ return fc.loadFromFiler(f)
+ })
+ if err != nil {
+ glog.Errorf("read filer conf: %v", err)
+ return
+ }
+ f.FilerConf = fc
+}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index b90457339..9437e9992 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -141,6 +141,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
return fmt.Errorf("process %v: %v", resp, err)
}
lastTsNs = resp.TsNs
+
+ f.onMetadataChangeEvent(resp)
+
}
})
if err != nil {