diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-11-16 16:30:19 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-11-16 16:30:19 +0800 |
| commit | 218561c206943d179740a80d8bb21e1f42b35daa (patch) | |
| tree | efabb31545c495648d99d4b2eb7ea7536d17d8e3 /weed/filer | |
| parent | e0d5207ed9ff6350e83497586ac9859d841a711a (diff) | |
| parent | c0d279c54e56882bc0ecdf496ecfcbcfa1c6d6e3 (diff) | |
| download | seaweedfs-218561c206943d179740a80d8bb21e1f42b35daa.tar.xz seaweedfs-218561c206943d179740a80d8bb21e1f42b35daa.zip | |
Merge pull request #37 from chrislusf/master
sync
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filer.go | 2 | ||||
| -rw-r--r-- | weed/filer/filer_conf.go | 139 | ||||
| -rw-r--r-- | weed/filer/filer_conf_test.go | 29 | ||||
| -rw-r--r-- | weed/filer/filer_notify_append.go | 10 | ||||
| -rw-r--r-- | weed/filer/filer_on_meta_event.go | 61 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 3 |
6 files changed, 240 insertions, 4 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 0c4d610c5..105c8e04f 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -41,6 +41,7 @@ type Filer struct { metaLogReplication string MetaAggregator *MetaAggregator Signature int32 + FilerConf *FilerConf } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -49,6 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, + FilerConf: NewFilerConf(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go new file mode 100644 index 000000000..182449d49 --- /dev/null +++ b/weed/filer/filer_conf.go @@ -0,0 +1,139 @@ +package filer + +import ( + "bytes" + "context" + "io" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "github.com/viant/ptrie" +) + +const ( + DirectoryEtc = "/etc" + FilerConfName = "filer.conf" +) + +type FilerConf struct { + rules ptrie.Trie +} + +func NewFilerConf() (fc *FilerConf) { + fc = &FilerConf{ + rules: ptrie.New(), + } + return fc +} + +func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) { + filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName) + entry, err := filer.FindEntry(context.Background(), filerConfPath) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + glog.Errorf("read filer conf entry %s: %v", filerConfPath, err) + return + } + + return fc.loadFromChunks(filer, entry.Chunks) +} + +func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) { + data, err := filer.readEntry(chunks) + if err != nil { + glog.Errorf("read filer conf content: %v", err) + return + } + + return fc.LoadFromBytes(data) +} + +func (fc *FilerConf) LoadFromBytes(data []byte) (err error) { + conf := &filer_pb.FilerConf{} + + if err := jsonpb.Unmarshal(bytes.NewReader(data), conf); err != nil { + + err = proto.UnmarshalText(string(data), conf) + if err != nil { + glog.Errorf("unable to parse filer conf: %v", err) + // this is not recoverable + return nil + } + + return nil + } + + return fc.doLoadConf(conf) +} + +func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) { + for _, location := range conf.Locations { + err = fc.AddLocationConf(location) + if err != nil { + // this is not recoverable + return nil + } + } + return nil +} + +func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error) { + err = fc.rules.Put([]byte(locConf.LocationPrefix), locConf) + if err != nil { + glog.Errorf("put location prefix: %v", err) + } + return +} + +func (fc *FilerConf) DeleteLocationConf(locationPrefix string) { + rules := ptrie.New() + fc.rules.Walk(func(key []byte, value interface{}) bool { + if string(key) == locationPrefix { + return true + } + rules.Put(key, value) + return true + }) + fc.rules = rules + return +} + +var ( + EmptyFilerConfPathConf = &filer_pb.FilerConf_PathConf{} +) + +func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf) { + fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool { + pathConf = value.(*filer_pb.FilerConf_PathConf) + return true + }) + if pathConf == nil { + return EmptyFilerConfPathConf + } + return pathConf +} + +func (fc *FilerConf) ToProto() *filer_pb.FilerConf { + m := &filer_pb.FilerConf{} + fc.rules.Walk(func(key []byte, value interface{}) bool { + pathConf := value.(*filer_pb.FilerConf_PathConf) + m.Locations = append(m.Locations, pathConf) + return true + }) + return m +} + +func (fc *FilerConf) ToText(writer io.Writer) error { + + m := jsonpb.Marshaler{ + EmitDefaults: false, + Indent: " ", + } + + return m.Marshal(writer, fc.ToProto()) +} diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go new file mode 100644 index 000000000..1bfe8bcfe --- /dev/null +++ b/weed/filer/filer_conf_test.go @@ -0,0 +1,29 @@ +package filer + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/stretchr/testify/assert" +) + +func TestFilerConf(t *testing.T) { + + fc := NewFilerConf() + + conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{ + { + LocationPrefix: "/buckets/abc", + Collection: "abc", + }, + { + LocationPrefix: "/buckets/abcd", + Collection: "abcd", + }, + }} + fc.doLoadConf(conf) + + assert.Equal(t, "abc", fc.MatchStorageRule("/buckets/abc/jasdf").Collection) + assert.Equal(t, "abcd", fc.MatchStorageRule("/buckets/abcd/jasdf").Collection) + +} diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index b1836b046..19da97f6c 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -13,7 +13,7 @@ import ( func (f *Filer) appendToFile(targetFile string, data []byte) error { - assignResult, uploadResult, err2 := f.assignAndUpload(data) + assignResult, uploadResult, err2 := f.assignAndUpload(targetFile, data) if err2 != nil { return err2 } @@ -46,14 +46,16 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { return err } -func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) { +func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { // assign a volume location + rule := f.FilerConf.MatchStorageRule(targetFile) assignRequest := &operation.VolumeAssignRequest{ Count: 1, - Collection: f.metaLogCollection, - Replication: f.metaLogReplication, + Collection: util.Nvl(f.metaLogCollection, rule.Collection), + Replication: util.Nvl(f.metaLogReplication, rule.Replication), WritableVolumeCount: 1, } + assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) if err != nil { return nil, nil, fmt.Errorf("AssignVolume: %v", err) diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go new file mode 100644 index 000000000..3de27da6e --- /dev/null +++ b/weed/filer/filer_on_meta_event.go @@ -0,0 +1,61 @@ +package filer + +import ( + "bytes" + "math" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers +func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { + if DirectoryEtc != event.Directory { + if DirectoryEtc != event.EventNotification.NewParentPath { + return + } + } + + entry := event.EventNotification.NewEntry + if entry == nil { + return + } + + glog.V(0).Infof("procesing %v", event) + if entry.Name == FilerConfName { + f.reloadFilerConfiguration(entry) + } + +} + +func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { + var buf bytes.Buffer + err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) { + fc := NewFilerConf() + err := fc.loadFromChunks(f, entry.Chunks) + if err != nil { + glog.Errorf("read filer conf chunks: %v", err) + return + } + f.FilerConf = fc +} + +func (f *Filer) LoadFilerConf() { + fc := NewFilerConf() + err := util.Retry("loadFilerConf", func() error { + return fc.loadFromFiler(f) + }) + if err != nil { + glog.Errorf("read filer conf: %v", err) + return + } + f.FilerConf = fc +} diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index b90457339..9437e9992 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -141,6 +141,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string return fmt.Errorf("process %v: %v", resp, err) } lastTsNs = resp.TsNs + + f.onMetadataChangeEvent(resp) + } }) if err != nil { |
