diff options
Diffstat (limited to 'weed/filer2/filer.go')
| -rw-r--r-- | weed/filer2/filer.go | 108 |
1 files changed, 84 insertions, 24 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index a0af942e0..666ab8fe4 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "path/filepath" "strings" "time" @@ -13,6 +12,9 @@ import ( "github.com/karlseguin/ccache" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -24,20 +26,30 @@ var ( ) type Filer struct { - store *FilerStoreWrapper - directoryCache *ccache.Cache - MasterClient *wdclient.MasterClient - fileIdDeletionChan chan string - GrpcDialOption grpc.DialOption + store *FilerStoreWrapper + directoryCache *ccache.Cache + MasterClient *wdclient.MasterClient + fileIdDeletionQueue *util.UnboundedQueue + GrpcDialOption grpc.DialOption + DirBucketsPath string + FsyncBuckets []string + buckets *FilerBuckets + Cipher bool + MetaLogBuffer *log_buffer.LogBuffer + metaLogCollection string + metaLogReplication string } -func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer { +func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { f := &Filer{ - directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), - MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters), - fileIdDeletionChan: make(chan string, PaginationSize), - GrpcDialOption: grpcDialOption, + directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), + fileIdDeletionQueue: util.NewUnboundedQueue(), + GrpcDialOption: grpcDialOption, } + f.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) + f.metaLogCollection = collection + f.metaLogReplication = replication go f.loopProcessingDeletion() @@ -85,7 +97,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro var lastDirectoryEntry *Entry for i := 1; i < len(dirParts); i++ { - dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...)) + dirPath := "/" + util.Join(dirParts[:i]...) // fmt.Printf("%d directory: %+v\n", i, dirPath) // first check local cache @@ -94,7 +106,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro // not found, check the store directly if dirEntry == nil { glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath)) + dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath)) } else { // glog.V(4).Infof("found cached directory: %s", dirPath) } @@ -106,24 +118,29 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro now := time.Now() dirEntry = &Entry{ - FullPath: FullPath(dirPath), + FullPath: util.FullPath(dirPath), Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | 0770, - Uid: entry.Uid, - Gid: entry.Gid, + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0110, + Uid: entry.Uid, + Gid: entry.Gid, + Collection: entry.Collection, + Replication: entry.Replication, + UserName: entry.UserName, + GroupNames: entry.GroupNames, }, } glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) mkdirErr := f.store.InsertEntry(ctx, dirEntry) if mkdirErr != nil { - if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound { + if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } } else { + f.maybeAddBucket(dirEntry) f.NotifyUpdateEvent(nil, dirEntry, false) } @@ -174,6 +191,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro } } + f.maybeAddBucket(entry) f.NotifyUpdateEvent(oldEntry, entry, true) f.deleteChunksIfNotNew(oldEntry, entry) @@ -197,7 +215,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er return f.store.UpdateEntry(ctx, entry) } -func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) { +func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) { now := time.Now() @@ -213,14 +231,51 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er }, }, nil } - return f.store.FindEntry(ctx, p) + entry, err = f.store.FindEntry(ctx, p) + if entry != nil && entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + return nil, filer_pb.ErrNotFound + } + } + return + } -func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { +func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } - return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + + var makeupEntries []*Entry + entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + for expiredCount > 0 && err == nil { + makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount) + if err == nil { + entries = append(entries, makeupEntries...) + } + } + + return entries, err +} + +func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) { + listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + if listErr != nil { + return listedEntries, expiredCount, "", listErr + } + for _, entry := range listedEntries { + lastFileName = entry.Name() + if entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + expiredCount++ + continue + } + } + entries = append(entries, entry) + } + return } func (f *Filer) cacheDelDirectory(dirpath string) { @@ -261,3 +316,8 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute) } + +func (f *Filer) Shutdown() { + f.MetaLogBuffer.Shutdown() + f.store.Shutdown() +} |
