diff options
Diffstat (limited to 'weed/filer2')
30 files changed, 752 insertions, 572 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 3e8554957..864c858d3 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -7,16 +7,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) type AbstractSqlStore struct { - DB *sql.DB - SqlInsert string - SqlUpdate string - SqlFind string - SqlDelete string - SqlListExclusive string - SqlListInclusive string + DB *sql.DB + SqlInsert string + SqlUpdate string + SqlFind string + SqlDelete string + SqlDeleteFolderChildren string + SqlListExclusive string + SqlListInclusive string } type TxOrDB interface { @@ -64,7 +67,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.En return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta) if err != nil { return fmt.Errorf("insert %s: %s", entry.FullPath, err) } @@ -84,7 +87,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("update %s: %s", entry.FullPath, err) } @@ -99,10 +102,10 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) { dir, name := fullpath.DirAndName() - row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir) + row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir) var data []byte if err := row.Scan(&data); err != nil { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } entry := &filer2.Entry{ @@ -119,7 +122,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2. dir, name := fullpath.DirAndName() - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("delete %s: %s", fullpath, err) } @@ -132,6 +135,21 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2. return nil } +func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { + + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath) + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) + } + + return nil +} + func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { sqlText := store.SqlListExclusive @@ -139,7 +157,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat sqlText = store.SqlListInclusive } - rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit) + rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), limit) if err != nil { return nil, fmt.Errorf("list %s : %v", fullpath, err) } diff --git a/weed/filer2/abstract_sql/hashing.go b/weed/filer2/abstract_sql/hashing.go deleted file mode 100644 index 5c982c537..000000000 --- a/weed/filer2/abstract_sql/hashing.go +++ /dev/null @@ -1,32 +0,0 @@ -package abstract_sql - -import ( - "crypto/md5" - "io" -) - -// returns a 64 bit big int -func hashToLong(dir string) (v int64) { - h := md5.New() - io.WriteString(h, dir) - - b := h.Sum(nil) - - v += int64(b[0]) - v <<= 8 - v += int64(b[1]) - v <<= 8 - v += int64(b[2]) - v <<= 8 - v += int64(b[3]) - v <<= 8 - v += int64(b[4]) - v <<= 8 - v += int64(b[5]) - v <<= 8 - v += int64(b[6]) - v <<= 8 - v += int64(b[7]) - - return -} diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index 466be5bf3..6f25fffec 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -3,10 +3,13 @@ package cassandra import ( "context" "fmt" + + "github.com/gocql/gocql" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gocql/gocql" ) func init() { @@ -22,10 +25,10 @@ func (store *CassandraStore) GetName() string { return "cassandra" } -func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) { +func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("keyspace"), - configuration.GetStringSlice("hosts"), + configuration.GetString(prefix+"keyspace"), + configuration.GetStringSlice(prefix+"hosts"), ) } @@ -80,12 +83,12 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full "SELECT meta FROM filemeta WHERE directory=? AND name=?", dir, name).Consistency(gocql.One).Scan(&data); err != nil { if err != gocql.ErrNotFound { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } } if len(data) == 0 { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } entry = &filer2.Entry{ @@ -112,6 +115,17 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu return nil } +func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { + + if err := store.session.Query( + "DELETE FROM filemeta WHERE directory=?", + fullpath).Exec(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go index 7b05b53dc..a174117ea 100644 --- a/weed/filer2/configuration.go +++ b/weed/filer2/configuration.go @@ -17,8 +17,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) { for _, store := range Stores { if config.GetBool(store.GetName() + ".enabled") { - viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { + if err := store.Initialize(config, store.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize store for %s: %+v", store.GetName(), err) } diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index 3f8a19114..c901927bb 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -30,6 +30,7 @@ type Entry struct { FullPath Attr + Extended map[string][]byte // the following is for files Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` @@ -56,6 +57,7 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry { IsDirectory: entry.IsDirectory(), Attributes: EntryAttributeToPb(entry), Chunks: entry.Chunks, + Extended: entry.Extended, } } diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go index cf4627b74..3a2dc6134 100644 --- a/weed/filer2/entry_codec.go +++ b/weed/filer2/entry_codec.go @@ -1,18 +1,21 @@ package filer2 import ( + "bytes" + "fmt" "os" "time" - "fmt" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { message := &filer_pb.Entry{ Attributes: EntryAttributeToPb(entry), Chunks: entry.Chunks, + Extended: entry.Extended, } return proto.Marshal(message) } @@ -27,6 +30,8 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { entry.Attr = PbToEntryAttribute(message.Attributes) + entry.Extended = message.Extended + entry.Chunks = message.Chunks return nil @@ -84,6 +89,10 @@ func EqualEntry(a, b *Entry) bool { return false } + if !eq(a.Extended, b.Extended) { + return false + } + for i := 0; i < len(a.Chunks); i++ { if !proto.Equal(a.Chunks[i], b.Chunks[i]) { return false @@ -91,3 +100,17 @@ func EqualEntry(a, b *Entry) bool { } return true } + +func eq(a, b map[string][]byte) bool { + if len(a) != len(b) { + return false + } + + for k, v := range a { + if w, ok := b[k]; !ok || !bytes.Equal(v, w) { + return false + } + } + + return true +} diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go index 1b0f928d0..83a6ddc5d 100644 --- a/weed/filer2/etcd/etcd_store.go +++ b/weed/filer2/etcd/etcd_store.go @@ -6,10 +6,12 @@ import ( "strings" "time" + "go.etcd.io/etcd/clientv3" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" - "go.etcd.io/etcd/clientv3" ) const ( @@ -28,13 +30,13 @@ func (store *EtcdStore) GetName() string { return "etcd" } -func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) { - servers := configuration.GetString("servers") +func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + servers := configuration.GetString(prefix + "servers") if servers == "" { servers = "localhost:2379" } - timeout := configuration.GetString("timeout") + timeout := configuration.GetString(prefix + "timeout") if timeout == "" { timeout = "3s" } @@ -99,7 +101,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) } if len(resp.Kvs) == 0 { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } entry = &filer2.Entry{ @@ -123,6 +125,16 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat return nil } +func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { + return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err) + } + + return nil +} + func (store *EtcdStore) ListDirectoryEntries( ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int, ) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index b5876df82..711488df1 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -71,6 +71,8 @@ type ChunkView struct { Size uint64 LogicOffset int64 IsFullChunk bool + CipherKey []byte + isGzipped bool } func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { @@ -86,6 +88,7 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int stop := offset + int64(size) for _, chunk := range visibles { + if chunk.start <= offset && offset < chunk.stop && offset < stop { isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop views = append(views, &ChunkView{ @@ -94,6 +97,8 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int Size: uint64(min(chunk.stop, stop) - offset), LogicOffset: offset, IsFullChunk: isFullChunk, + CipherKey: chunk.cipherKey, + isGzipped: chunk.isGzipped, }) offset = min(chunk.stop, stop) } @@ -120,13 +125,7 @@ var bufPool = sync.Pool{ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { - newV := newVisibleInterval( - chunk.Offset, - chunk.Offset+int64(chunk.Size), - chunk.GetFileIdString(), - chunk.Mtime, - true, - ) + newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped) length := len(visibles) if length == 0 { @@ -140,23 +139,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. logPrintf(" before", visibles) for _, v := range visibles { if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval( - v.start, - chunk.Offset, - v.fileId, - v.modifiedTime, - false, - )) + newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped)) } chunkStop := chunk.Offset + int64(chunk.Size) if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false, - )) + newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped)) } if chunkStop <= v.start || v.stop <= chunk.Offset { newVisibles = append(newVisibles, v) @@ -187,6 +174,7 @@ func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []Vi var newVisibles []VisibleInterval for _, chunk := range chunks { + newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk) t := visibles[:0] visibles = newVisibles @@ -208,15 +196,19 @@ type VisibleInterval struct { modifiedTime int64 fileId string isFullChunk bool + cipherKey []byte + isGzipped bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) VisibleInterval { +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval { return VisibleInterval{ start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime, isFullChunk: isFullChunk, + cipherKey: cipherKey, + isGzipped: isGzipped, } } diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index e75e60753..bb4a6c74d 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -331,6 +331,42 @@ func TestChunksReading(t *testing.T) { {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, }, }, + // case 8: edge cases + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 90, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 190, Size: 300, FileId: "fsad", Mtime: 353}, + }, + Offset: 0, + Size: 300, + Expected: []*ChunkView{ + {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0}, + {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90}, + {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190}, + }, + }, + // case 9: edge cases + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 43175947, FileId: "2,111fc2cbfac1", Mtime: 1}, + {Offset: 43175936, Size: 52981771 - 43175936, FileId: "2,112a36ea7f85", Mtime: 2}, + {Offset: 52981760, Size: 72564747 - 52981760, FileId: "4,112d5f31c5e7", Mtime: 3}, + {Offset: 72564736, Size: 133255179 - 72564736, FileId: "1,113245f0cdb6", Mtime: 4}, + {Offset: 133255168, Size: 137269259 - 133255168, FileId: "3,1141a70733b5", Mtime: 5}, + {Offset: 137269248, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", Mtime: 6}, + }, + Offset: 0, + Size: 153578836, + Expected: []*ChunkView{ + {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0}, + {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936}, + {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760}, + {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736}, + {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168}, + {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248}, + }, + }, } for i, testcase := range testcases { diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 672295dea..d3343f610 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -3,37 +3,46 @@ package filer2 import ( "context" "fmt" - "google.golang.org/grpc" - "math" "os" "path/filepath" "strings" "time" + "google.golang.org/grpc" + + "github.com/karlseguin/ccache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/karlseguin/ccache" ) +const PaginationSize = 1024 * 256 + var ( OS_UID = uint32(os.Getuid()) OS_GID = uint32(os.Getgid()) ) type Filer struct { - store *FilerStoreWrapper - directoryCache *ccache.Cache - MasterClient *wdclient.MasterClient - fileIdDeletionChan chan string - GrpcDialOption grpc.DialOption + store *FilerStoreWrapper + directoryCache *ccache.Cache + MasterClient *wdclient.MasterClient + fileIdDeletionQueue *util.UnboundedQueue + GrpcDialOption grpc.DialOption + DirBucketsPath string + DirQueuesPath string + buckets *FilerBuckets + Cipher bool } -func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer { +func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32) *Filer { f := &Filer{ - directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), - MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters), - fileIdDeletionChan: make(chan string, 4096), - GrpcDialOption: grpcDialOption, + directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters), + fileIdDeletionQueue: util.NewUnboundedQueue(), + GrpcDialOption: grpcDialOption, } go f.loopProcessingDeletion() @@ -69,7 +78,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error { return f.store.RollbackTransaction(ctx) } -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) error { if string(entry.FullPath) == "/" { return nil @@ -93,7 +102,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { glog.V(4).Infof("find uncached directory: %s", dirPath) dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath)) } else { - glog.V(4).Infof("found cached directory: %s", dirPath) + // glog.V(4).Infof("found cached directory: %s", dirPath) } // no such existing directory @@ -105,25 +114,30 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { dirEntry = &Entry{ FullPath: FullPath(dirPath), Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | 0770, - Uid: entry.Uid, - Gid: entry.Gid, + Mtime: now, + Crtime: now, + Mode: os.ModeDir | 0770, + Uid: entry.Uid, + Gid: entry.Gid, + Collection: entry.Collection, + Replication: entry.Replication, }, } glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) mkdirErr := f.store.InsertEntry(ctx, dirEntry) if mkdirErr != nil { - if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound { + if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == filer_pb.ErrNotFound { + glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } } else { + f.maybeAddBucket(dirEntry) f.NotifyUpdateEvent(nil, dirEntry, false) } } else if !dirEntry.IsDirectory() { + glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) return fmt.Errorf("%s is a file", dirPath) } @@ -138,6 +152,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { } if lastDirectoryEntry == nil { + glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath) return fmt.Errorf("parent folder not found: %v", entry.FullPath) } @@ -151,22 +166,30 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { oldEntry, _ := f.FindEntry(ctx, entry.FullPath) + glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl) if oldEntry == nil { if err := f.store.InsertEntry(ctx, entry); err != nil { glog.Errorf("insert entry %s: %v", entry.FullPath, err) return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) } } else { + if o_excl { + glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath) + return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath) + } if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil { glog.Errorf("update entry %s: %v", entry.FullPath, err) return fmt.Errorf("update entry %s: %v", entry.FullPath, err) } } + f.maybeAddBucket(entry) f.NotifyUpdateEvent(oldEntry, entry, true) f.deleteChunksIfNotNew(oldEntry, entry) + glog.V(4).Infof("CreateEntry %s: created", entry.FullPath) + return nil } @@ -200,75 +223,51 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er }, }, nil } - return f.store.FindEntry(ctx, p) -} - -func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { - entry, err := f.FindEntry(ctx, p) - if err != nil { - return err - } - - if entry.IsDirectory() { - limit := int(1) - if isRecursive { - limit = math.MaxInt32 - } - lastFileName := "" - includeLastFile := false - for limit > 0 { - entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024) - if err != nil { - glog.Errorf("list folder %s: %v", p, err) - return fmt.Errorf("list folder %s: %v", p, err) - } - - if len(entries) == 0 { - break - } - - if isRecursive { - for _, sub := range entries { - lastFileName = sub.Name() - err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks) - if err != nil && !ignoreRecursiveError { - return err - } - limit-- - if limit <= 0 { - break - } - } - } - - if len(entries) < 1024 { - break - } + entry, err = f.store.FindEntry(ctx, p) + if entry != nil && entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + return nil, filer_pb.ErrNotFound } - - f.cacheDelDirectory(string(p)) - } + return - if shouldDeleteChunks { - f.DeleteChunks(p, entry.Chunks) - } +} - if p == "/" { - return nil +func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { + if strings.HasSuffix(string(p), "/") && len(p) > 1 { + p = p[0 : len(p)-1] } - glog.V(3).Infof("deleting entry %v", p) - f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) + var makeupEntries []*Entry + entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + for expiredCount > 0 && err == nil { + makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount) + if err == nil { + entries = append(entries, makeupEntries...) + } + } - return f.store.DeleteEntry(ctx, p) + return entries, err } -func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { - if strings.HasSuffix(string(p), "/") && len(p) > 1 { - p = p[0 : len(p)-1] +func (f *Filer) doListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) { + listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + if listErr != nil { + return listedEntries, expiredCount, "", listErr } - return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + for _, entry := range listedEntries { + lastFileName = entry.Name() + if entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + expiredCount++ + continue + } + } + entries = append(entries, entry) + } + return } func (f *Filer) cacheDelDirectory(dirpath string) { diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go new file mode 100644 index 000000000..601b7dbf3 --- /dev/null +++ b/weed/filer2/filer_buckets.go @@ -0,0 +1,113 @@ +package filer2 + +import ( + "context" + "math" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type BucketName string +type BucketOption struct { + Name BucketName + Replication string +} +type FilerBuckets struct { + dirBucketsPath string + buckets map[BucketName]*BucketOption + sync.RWMutex +} + +func (f *Filer) LoadBuckets(dirBucketsPath string) { + + f.buckets = &FilerBuckets{ + buckets: make(map[BucketName]*BucketOption), + } + f.DirBucketsPath = dirBucketsPath + + limit := math.MaxInt32 + + entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit) + + if err != nil { + glog.V(1).Infof("no buckets found: %v", err) + return + } + + glog.V(1).Infof("buckets found: %d", len(entries)) + + f.buckets.Lock() + for _, entry := range entries { + f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ + Name: BucketName(entry.Name()), + Replication: entry.Replication, + } + } + f.buckets.Unlock() + +} + +func (f *Filer) ReadBucketOption(buketName string) (replication string) { + + f.buckets.RLock() + defer f.buckets.RUnlock() + + option, found := f.buckets.buckets[BucketName(buketName)] + + if !found { + return "" + } + return option.Replication + +} + +func (f *Filer) isBucket(entry *Entry) bool { + if !entry.IsDirectory() { + return false + } + parent, dirName := entry.FullPath.DirAndName() + if parent != f.DirBucketsPath { + return false + } + + f.buckets.RLock() + defer f.buckets.RUnlock() + + _, found := f.buckets.buckets[BucketName(dirName)] + + return found + +} + +func (f *Filer) maybeAddBucket(entry *Entry) { + if !entry.IsDirectory() { + return + } + parent, dirName := entry.FullPath.DirAndName() + if parent != f.DirBucketsPath { + return + } + f.addBucket(dirName, &BucketOption{ + Name: BucketName(dirName), + Replication: entry.Replication, + }) +} + +func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) { + + f.buckets.Lock() + defer f.buckets.Unlock() + + f.buckets.buckets[BucketName(buketName)] = bucketOption + +} + +func (f *Filer) deleteBucket(buketName string) { + + f.buckets.Lock() + defer f.buckets.Unlock() + + delete(f.buckets.buckets, BucketName(buketName)) + +} diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go index 7e093eea2..1c1fa6a5b 100644 --- a/weed/filer2/filer_client_util.go +++ b/weed/filer2/filer_client_util.go @@ -3,6 +3,8 @@ package filer2 import ( "context" "fmt" + "io" + "math" "strings" "sync" @@ -20,10 +22,11 @@ func VolumeId(fileId string) string { } type FilerClient interface { - WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error + WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error + AdjustedUrl(hostAndPort string) string } -func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { +func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { var vids []string for _, chunkView := range chunkViews { vids = append(vids, VolumeId(chunkView.FileId)) @@ -31,10 +34,10 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s vid2Locations := make(map[string]*filer_pb.Locations) - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: vids, }) if err != nil { @@ -65,20 +68,16 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s return } + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], - !chunkView.IsFullChunk) + n, err = util.ReadUrl(fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)]) if err != nil { - glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) + glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err) err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) + volumeServerAddress, chunkView.FileId, err) return } @@ -91,68 +90,75 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s return } -func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) { +func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) { - dir, name := FullPath(fullFilePath).DirAndName() + dir, name := fullFilePath.DirAndName() - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, Name: name, } - glog.V(3).Infof("read %s request: %v", fullFilePath, request) - resp, err := client.LookupDirectoryEntry(ctx, request) + // glog.V(3).Infof("read %s request: %v", fullFilePath, request) + resp, err := filer_pb.LookupEntry(client, request) if err != nil { - if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) { + if err == filer_pb.ErrNotFound { return nil } - glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err) + glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err) return err } - if resp.Entry != nil { - entry = resp.Entry + if resp.Entry == nil { + // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry) + return nil } + entry = resp.Entry return nil }) return } -func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) { +func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) { - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - paginationLimit := 1024 + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { lastEntryName := "" - for { - - request := &filer_pb.ListEntriesRequest{ - Directory: fullDirPath, - StartFromFileName: lastEntryName, - Limit: uint32(paginationLimit), - } + request := &filer_pb.ListEntriesRequest{ + Directory: string(fullDirPath), + Prefix: prefix, + StartFromFileName: lastEntryName, + Limit: math.MaxUint32, + } - glog.V(3).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - return fmt.Errorf("list %s: %v", fullDirPath, err) - } + glog.V(3).Infof("read directory: %v", request) + stream, err := client.ListEntries(context.Background(), request) + if err != nil { + return fmt.Errorf("list %s: %v", fullDirPath, err) + } - for _, entry := range resp.Entries { - fn(entry) - lastEntryName = entry.Name + var prevEntry *filer_pb.Entry + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + if prevEntry != nil { + fn(prevEntry, true) + } + break + } else { + return recvErr + } } - - if len(resp.Entries) < paginationLimit { - break + if prevEntry != nil { + fn(prevEntry, false) } - + prevEntry = resp.Entry } return nil diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go new file mode 100644 index 000000000..d0792ac66 --- /dev/null +++ b/weed/filer2/filer_delete_entry.go @@ -0,0 +1,125 @@ +package filer2 + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) + +func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { + if p == "/" { + return nil + } + + entry, findErr := f.FindEntry(ctx, p) + if findErr != nil { + return findErr + } + + isCollection := f.isBucket(entry) + + var chunks []*filer_pb.FileChunk + chunks = append(chunks, entry.Chunks...) + if entry.IsDirectory() { + // delete the folder children, not including the folder itself + var dirChunks []*filer_pb.FileChunk + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection) + if err != nil { + glog.V(0).Infof("delete directory %s: %v", p, err) + return fmt.Errorf("delete directory %s: %v", p, err) + } + chunks = append(chunks, dirChunks...) + } + + // delete the file or folder + err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks) + if err != nil { + return fmt.Errorf("delete file %s: %v", p, err) + } + + if shouldDeleteChunks && !isCollection { + go f.DeleteChunks(chunks) + } + if isCollection { + collectionName := entry.Name() + f.doDeleteCollection(collectionName) + f.deleteBucket(collectionName) + } + + return nil +} + +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) { + + lastFileName := "" + includeLastFile := false + for { + entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize) + if err != nil { + glog.Errorf("list folder %s: %v", entry.FullPath, err) + return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) + } + if lastFileName == "" && !isRecursive && len(entries) > 0 { + // only for first iteration in the loop + return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + } + + for _, sub := range entries { + lastFileName = sub.Name() + var dirChunks []*filer_pb.FileChunk + if sub.IsDirectory() { + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks) + chunks = append(chunks, dirChunks...) + } else { + chunks = append(chunks, sub.Chunks...) + } + if err != nil && !ignoreRecursiveError { + return nil, err + } + } + + if len(entries) < PaginationSize { + break + } + } + + f.cacheDelDirectory(string(entry.FullPath)) + + glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) + + if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { + return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) + } + f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) + + return chunks, nil +} + +func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) { + + glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) + + if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { + return fmt.Errorf("filer store delete: %v", storeDeletionErr) + } + f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) + + return nil +} + +func (f *Filer) doDeleteCollection(collectionName string) (err error) { + + return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ + Name: collectionName, + }) + if err != nil { + glog.Infof("delete collection %s: %v", collectionName, err) + } + return err + }) + +} diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 25e27e504..3a64f636e 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -10,8 +10,6 @@ import ( func (f *Filer) loopProcessingDeletion() { - ticker := time.NewTicker(5 * time.Second) - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { m := make(map[string]operation.LookupResult) for _, vid := range vids { @@ -31,37 +29,35 @@ func (f *Filer) loopProcessingDeletion() { return m, nil } - var fileIds []string + var deletionCount int for { - select { - case fid := <-f.fileIdDeletionChan: - fileIds = append(fileIds, fid) - if len(fileIds) >= 4096 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] + deletionCount = 0 + f.fileIdDeletionQueue.Consume(func(fileIds []string) { + deletionCount = len(fileIds) + _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) + if err != nil { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } else { + glog.V(1).Infof("deleting fileIds len=%d", deletionCount) } + }) + + if deletionCount == 0 { + time.Sleep(1123 * time.Millisecond) } } } -func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) { +func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String()) - f.fileIdDeletionChan <- chunk.GetFileIdString() + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) } } // DeleteFileByFileId direct delete by file id. // Only used when the fileId is not being managed by snapshots. func (f *Filer) DeleteFileByFileId(fileId string) { - f.fileIdDeletionChan <- fileId + f.fileIdDeletionQueue.EnQueue(fileId) } func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { @@ -70,7 +66,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { return } if newEntry == nil { - f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks) + f.DeleteChunks(oldEntry.Chunks) } var toDelete []*filer_pb.FileChunk @@ -84,5 +80,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { toDelete = append(toDelete, oldChunk) } } - f.DeleteChunks(oldEntry.FullPath, toDelete) + f.DeleteChunks(toDelete) } diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index 8caa44ee2..f724f79c2 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -2,7 +2,6 @@ package filer2 import ( "context" - "errors" "time" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -14,12 +13,13 @@ type FilerStore interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) // err == filer2.ErrNotFound if not found FindEntry(context.Context, FullPath) (entry *Entry, err error) DeleteEntry(context.Context, FullPath) (err error) + DeleteFolderChildren(context.Context, FullPath) (err error) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) BeginTransaction(ctx context.Context) (context.Context, error) @@ -27,8 +27,6 @@ type FilerStore interface { RollbackTransaction(ctx context.Context) error } -var ErrNotFound = errors.New("filer: no entry is found in filer store") - type FilerStoreWrapper struct { actualStore FilerStore } @@ -46,8 +44,8 @@ func (fsw *FilerStoreWrapper) GetName() string { return fsw.actualStore.GetName() } -func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error { - return fsw.actualStore.Initialize(configuration) +func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { + return fsw.actualStore.Initialize(configuration, prefix) } func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { @@ -97,6 +95,16 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err return fsw.actualStore.DeleteEntry(ctx, fp) } +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) { + stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) + }() + + return fsw.actualStore.DeleteFolderChildren(ctx, fp) +} + func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc() start := time.Now() diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go index 191e51cf3..133069f93 100644 --- a/weed/filer2/fullpath.go +++ b/weed/filer2/fullpath.go @@ -3,6 +3,8 @@ package filer2 import ( "path/filepath" "strings" + + "github.com/chrislusf/seaweedfs/weed/util" ) type FullPath string @@ -34,3 +36,7 @@ func (fp FullPath) Child(name string) FullPath { } return FullPath(dir + "/" + name) } + +func (fp FullPath) AsInode() uint64 { + return uint64(util.HashStringToLong(string(fp))) +} diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index d00eba859..807fcb56f 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -5,12 +5,14 @@ import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" ) const ( @@ -29,8 +31,8 @@ func (store *LevelDBStore) GetName() string { return "leveldb" } -func (store *LevelDBStore) Initialize(configuration weed_util.Configuration) (err error) { - dir := configuration.GetString("dir") +func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") return store.initialize(dir) } @@ -93,7 +95,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPa data, err := store.db.Get(key, nil) if err == leveldb.ErrNotFound { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } if err != nil { return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) @@ -123,6 +125,34 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full return nil } +func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + + batch := new(leveldb.Batch) + + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + batch.Delete([]byte(genKey(string(fullpath), fileName))) + } + iter.Release() + + err = store.db.Write(batch, nil) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 904de8c97..497158420 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -9,7 +9,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, 0) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDBStore{} @@ -30,7 +30,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := filer.CreateEntry(ctx, entry1); err != nil { + if err := filer.CreateEntry(ctx, entry1, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -64,7 +64,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, 0) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDBStore{} diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go index 4b47d2eb3..0b07c6833 100644 --- a/weed/filer2/leveldb2/leveldb2_store.go +++ b/weed/filer2/leveldb2/leveldb2_store.go @@ -8,12 +8,14 @@ import ( "io" "os" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -29,8 +31,8 @@ func (store *LevelDB2Store) GetName() string { return "leveldb2" } -func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) { - dir := configuration.GetString("dir") +func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") return store.initialize(dir, 8) } @@ -103,7 +105,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullP data, err := store.dbs[partitionId].Get(key, nil) if err == leveldb.ErrNotFound { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } if err != nil { return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) @@ -134,6 +136,34 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful return nil } +func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) + + batch := new(leveldb.Batch) + + iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + batch.Delete(append(directoryPrefix, []byte(fileName)...)) + } + iter.Release() + + err = store.dbs[partitionId].Write(batch, nil) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index e28ef7dac..dc94f2ac7 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -9,7 +9,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, 0) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDB2Store{} @@ -30,7 +30,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := filer.CreateEntry(ctx, entry1); err != nil { + if err := filer.CreateEntry(ctx, entry1, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -64,7 +64,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, 0) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDB2Store{} diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go deleted file mode 100644 index 9c10a5472..000000000 --- a/weed/filer2/memdb/memdb_store.go +++ /dev/null @@ -1,132 +0,0 @@ -package memdb - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/google/btree" - "strings" - "sync" -) - -func init() { - filer2.Stores = append(filer2.Stores, &MemDbStore{}) -} - -type MemDbStore struct { - tree *btree.BTree - treeLock sync.Mutex -} - -type entryItem struct { - *filer2.Entry -} - -func (a entryItem) Less(b btree.Item) bool { - return strings.Compare(string(a.FullPath), string(b.(entryItem).FullPath)) < 0 -} - -func (store *MemDbStore) GetName() string { - return "memory" -} - -func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) { - store.tree = btree.New(8) - return nil -} - -func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *MemDbStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *MemDbStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - // println("inserting", entry.FullPath) - store.treeLock.Lock() - store.tree.ReplaceOrInsert(entryItem{entry}) - store.treeLock.Unlock() - return nil -} - -func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - if _, err = store.FindEntry(ctx, entry.FullPath); err != nil { - return fmt.Errorf("no such file %s : %v", entry.FullPath, err) - } - store.treeLock.Lock() - store.tree.ReplaceOrInsert(entryItem{entry}) - store.treeLock.Unlock() - return nil -} - -func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}}) - if item == nil { - return nil, filer2.ErrNotFound - } - entry = item.(entryItem).Entry - return entry, nil -} - -func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - store.treeLock.Lock() - store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}}) - store.treeLock.Unlock() - return nil -} - -func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - - startFrom := string(fullpath) - if startFileName != "" { - startFrom = startFrom + "/" + startFileName - } - - store.tree.AscendGreaterOrEqual(entryItem{&filer2.Entry{FullPath: filer2.FullPath(startFrom)}}, - func(item btree.Item) bool { - if limit <= 0 { - return false - } - entry := item.(entryItem).Entry - // println("checking", entry.FullPath) - - if entry.FullPath == fullpath { - // skipping the current directory - // println("skipping the folder", entry.FullPath) - return true - } - - dir, name := entry.FullPath.DirAndName() - if name == startFileName { - if inclusive { - limit-- - entries = append(entries, entry) - } - return true - } - - // only iterate the same prefix - if !strings.HasPrefix(string(entry.FullPath), string(fullpath)) { - // println("breaking from", entry.FullPath) - return false - } - - if dir != string(fullpath) { - // this could be items in deeper directories - // println("skipping deeper folder", entry.FullPath) - return true - } - // now process the directory items - // println("adding entry", entry.FullPath) - limit-- - entries = append(entries, entry) - return true - }, - ) - return entries, nil -} diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go deleted file mode 100644 index 3fd806aeb..000000000 --- a/weed/filer2/memdb/memdb_store_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package memdb - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer2" - "testing" -) - -func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - store := &MemDbStore{} - store.Initialize(nil) - filer.SetStore(store) - filer.DisableDirectoryCache() - - ctx := context.Background() - - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") - - entry1 := &filer2.Entry{ - FullPath: fullpath, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - if err := filer.CreateEntry(ctx, entry1); err != nil { - t.Errorf("create entry %v: %v", entry1.FullPath, err) - return - } - - entry, err := filer.FindEntry(ctx, fullpath) - - if err != nil { - t.Errorf("find entry: %v", err) - return - } - - if entry.FullPath != entry1.FullPath { - t.Errorf("find wrong entry: %v", entry.FullPath) - return - } - -} - -func TestCreateFileAndList(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - store := &MemDbStore{} - store.Initialize(nil) - filer.SetStore(store) - filer.DisableDirectoryCache() - - ctx := context.Background() - - entry1 := &filer2.Entry{ - FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"), - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - entry2 := &filer2.Entry{ - FullPath: filer2.FullPath("/home/chris/this/is/one/file2.jpg"), - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - filer.CreateEntry(ctx, entry1) - filer.CreateEntry(ctx, entry2) - - // checking the 2 files - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100) - - if err != nil { - t.Errorf("list entries: %v", err) - return - } - - if len(entries) != 2 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - if entries[0].FullPath != entry1.FullPath { - t.Errorf("find wrong entry 1: %v", entries[0].FullPath) - return - } - - if entries[1].FullPath != entry2.FullPath { - t.Errorf("find wrong entry 2: %v", entries[1].FullPath) - return - } - - // checking the offset - entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking root directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // add file3 - file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg") - entry3 := &filer2.Entry{ - FullPath: file3Path, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - filer.CreateEntry(ctx, entry3) - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) - if len(entries) != 2 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // delete file and count - filer.DeleteEntryMetaAndData(ctx, file3Path, false, false, false) - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go index e18299bd2..63d99cd9d 100644 --- a/weed/filer2/mysql/mysql_store.go +++ b/weed/filer2/mysql/mysql_store.go @@ -26,28 +26,35 @@ func (store *MysqlStore) GetName() string { return "mysql" } -func (store *MysqlStore) Initialize(configuration util.Configuration) (err error) { +func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("username"), - configuration.GetString("password"), - configuration.GetString("hostname"), - configuration.GetInt("port"), - configuration.GetString("database"), - configuration.GetInt("connection_max_idle"), - configuration.GetInt("connection_max_open"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), + configuration.GetBool(prefix+"interpolateParams"), ) } -func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int) (err error) { +func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int, + interpolateParams bool) (err error) { store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" + store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?" store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?" sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) + if interpolateParams { + sqlUrl += "&interpolateParams=true" + } + var dbErr error store.DB, dbErr = sql.Open("mysql", sqlUrl) if dbErr != nil { diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go index ffd3d1e01..27a0c2513 100644 --- a/weed/filer2/postgres/postgres_store.go +++ b/weed/filer2/postgres/postgres_store.go @@ -26,16 +26,16 @@ func (store *PostgresStore) GetName() string { return "postgres" } -func (store *PostgresStore) Initialize(configuration util.Configuration) (err error) { +func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("username"), - configuration.GetString("password"), - configuration.GetString("hostname"), - configuration.GetInt("port"), - configuration.GetString("database"), - configuration.GetString("sslmode"), - configuration.GetInt("connection_max_idle"), - configuration.GetInt("connection_max_open"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetString(prefix+"sslmode"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), ) } @@ -45,6 +45,7 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" + store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go index 11c315391..eaaecb740 100644 --- a/weed/filer2/redis/redis_cluster_store.go +++ b/weed/filer2/redis/redis_cluster_store.go @@ -18,17 +18,25 @@ func (store *RedisClusterStore) GetName() string { return "redis_cluster" } -func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) { +func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", true) + configuration.SetDefault(prefix+"routeByLatency", true) + return store.initialize( - configuration.GetStringSlice("addresses"), - configuration.GetString("password"), + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), ) } -func (store *RedisClusterStore) initialize(addresses []string, password string) (err error) { +func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { store.Client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: addresses, - Password: password, + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, }) return } diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go index c56fa014c..9debdb070 100644 --- a/weed/filer2/redis/redis_store.go +++ b/weed/filer2/redis/redis_store.go @@ -18,11 +18,11 @@ func (store *RedisStore) GetName() string { return "redis" } -func (store *RedisStore) Initialize(configuration util.Configuration) (err error) { +func (store *RedisStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("address"), - configuration.GetString("password"), - configuration.GetInt("database"), + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), ) } diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go index ce41d4d70..c5b9d9416 100644 --- a/weed/filer2/redis/universal_redis_store.go +++ b/weed/filer2/redis/universal_redis_store.go @@ -3,12 +3,15 @@ package redis import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/go-redis/redis" "sort" "strings" "time" + + "github.com/go-redis/redis" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) const ( @@ -62,7 +65,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2 data, err := store.Client.Get(string(fullpath)).Result() if err == redis.Nil { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } if err != nil { @@ -99,10 +102,29 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file return nil } +func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + + members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() + if err != nil { + return fmt.Errorf("delete folder %s : %v", fullpath, err) + } + + for _, fileName := range members { + path := filer2.NewFullPath(string(fullpath), fileName) + _, err = store.Client.Del(string(path)).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() + dirListKey := genDirectoryListKey(string(fullpath)) + members, err := store.Client.SMembers(dirListKey).Result() if err != nil { return nil, fmt.Errorf("list %s : %v", fullpath, err) } @@ -141,6 +163,13 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full if err != nil { glog.V(0).Infof("list %s : %v", path, err) } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(string(path)).Result() + store.Client.SRem(dirListKey, fileName).Result() + continue + } + } entries = append(entries, entry) } } diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 01b87cad1..381d99144 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -26,8 +26,9 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f } for _, chunkView := range chunkViews { + urlString := fileId2Url[chunkView.FileId] - _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { + err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) { w.Write(data) }) if err != nil { diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go index 0e6b93c86..2a9dd6648 100644 --- a/weed/filer2/tikv/tikv_store.go +++ b/weed/filer2/tikv/tikv_store.go @@ -1,5 +1,6 @@ // +build !386 // +build !arm +// +build !windows package tikv @@ -12,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" "github.com/pingcap/tidb/kv" @@ -30,8 +32,8 @@ func (store *TikvStore) GetName() string { return "tikv" } -func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) { - pdAddr := configuration.GetString("pdAddress") +func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + pdAddr := configuration.GetString(prefix + "pdAddress") return store.initialize(pdAddr) } @@ -110,7 +112,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) data, err := store.getTx(ctx).Get(ctx, key) if err == kv.ErrNotExist { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } if err != nil { return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) @@ -141,6 +143,38 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat return nil } +func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + tx := store.getTx(ctx) + + iter, err := tx.Iter(directoryPrefix, nil) + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err) + } + defer iter.Close() + for iter.Valid() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + iter.Next() + continue + } + + if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + iter.Next() + } + + return nil +} + func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/tikv/tikv_store_unsupported.go b/weed/filer2/tikv/tikv_store_unsupported.go index 95c8b2dad..713c84bf8 100644 --- a/weed/filer2/tikv/tikv_store_unsupported.go +++ b/weed/filer2/tikv/tikv_store_unsupported.go @@ -1,4 +1,4 @@ -// +build 386 arm +// +build 386 arm windows package tikv @@ -21,7 +21,7 @@ func (store *TikvStore) GetName() string { return "tikv" } -func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) { +func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { return fmt.Errorf("not implemented for 32 bit computers") } @@ -55,6 +55,10 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat return fmt.Errorf("not implemented for 32 bit computers") } +func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + return fmt.Errorf("not implemented for 32 bit computers") +} + func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { return nil, fmt.Errorf("not implemented for 32 bit computers") |
