aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filer.go')
-rw-r--r--weed/filer2/filer.go108
1 files changed, 84 insertions, 24 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index a0af942e0..666ab8fe4 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
- "path/filepath"
"strings"
"time"
@@ -13,6 +12,9 @@ import (
"github.com/karlseguin/ccache"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -24,20 +26,30 @@ var (
)
type Filer struct {
- store *FilerStoreWrapper
- directoryCache *ccache.Cache
- MasterClient *wdclient.MasterClient
- fileIdDeletionChan chan string
- GrpcDialOption grpc.DialOption
+ store *FilerStoreWrapper
+ directoryCache *ccache.Cache
+ MasterClient *wdclient.MasterClient
+ fileIdDeletionQueue *util.UnboundedQueue
+ GrpcDialOption grpc.DialOption
+ DirBucketsPath string
+ FsyncBuckets []string
+ buckets *FilerBuckets
+ Cipher bool
+ MetaLogBuffer *log_buffer.LogBuffer
+ metaLogCollection string
+ metaLogReplication string
}
-func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
+func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
- fileIdDeletionChan: make(chan string, PaginationSize),
- GrpcDialOption: grpcDialOption,
+ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
+ fileIdDeletionQueue: util.NewUnboundedQueue(),
+ GrpcDialOption: grpcDialOption,
}
+ f.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
+ f.metaLogCollection = collection
+ f.metaLogReplication = replication
go f.loopProcessingDeletion()
@@ -85,7 +97,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
var lastDirectoryEntry *Entry
for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...))
+ dirPath := "/" + util.Join(dirParts[:i]...)
// fmt.Printf("%d directory: %+v\n", i, dirPath)
// first check local cache
@@ -94,7 +106,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
// not found, check the store directly
if dirEntry == nil {
glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
+ dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath))
} else {
// glog.V(4).Infof("found cached directory: %s", dirPath)
}
@@ -106,24 +118,29 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
now := time.Now()
dirEntry = &Entry{
- FullPath: FullPath(dirPath),
+ FullPath: util.FullPath(dirPath),
Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | 0770,
- Uid: entry.Uid,
- Gid: entry.Gid,
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0110,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
},
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
+ f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(nil, dirEntry, false)
}
@@ -174,6 +191,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
}
}
+ f.maybeAddBucket(entry)
f.NotifyUpdateEvent(oldEntry, entry, true)
f.deleteChunksIfNotNew(oldEntry, entry)
@@ -197,7 +215,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
return f.store.UpdateEntry(ctx, entry)
}
-func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
+func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
now := time.Now()
@@ -213,14 +231,51 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
},
}, nil
}
- return f.store.FindEntry(ctx, p)
+ entry, err = f.store.FindEntry(ctx, p)
+ if entry != nil && entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ return nil, filer_pb.ErrNotFound
+ }
+ }
+ return
+
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+
+ var makeupEntries []*Entry
+ entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ for expiredCount > 0 && err == nil {
+ makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount)
+ if err == nil {
+ entries = append(entries, makeupEntries...)
+ }
+ }
+
+ return entries, err
+}
+
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
+ listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ if listErr != nil {
+ return listedEntries, expiredCount, "", listErr
+ }
+ for _, entry := range listedEntries {
+ lastFileName = entry.Name()
+ if entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ expiredCount++
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ return
}
func (f *Filer) cacheDelDirectory(dirpath string) {
@@ -261,3 +316,8 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
}
+
+func (f *Filer) Shutdown() {
+ f.MetaLogBuffer.Shutdown()
+ f.store.Shutdown()
+}