diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/filer2 | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/filer2')
32 files changed, 1475 insertions, 717 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 47fe507a1..5ade18960 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -98,13 +99,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En return nil } -func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) { +func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer2.Entry, error) { dir, name := fullpath.DirAndName() row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir) var data []byte if err := row.Scan(&data); err != nil { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } entry := &filer2.Entry{ @@ -117,7 +118,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.Fu return entry, nil } -func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { +func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { dir, name := fullpath.DirAndName() @@ -134,7 +135,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2. return nil } -func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { +func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath) if err != nil { @@ -149,7 +150,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat return nil } -func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { +func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { sqlText := store.SqlListExclusive if inclusive { @@ -171,7 +172,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), name), + FullPath: util.NewFullPath(string(fullpath), name), } if err = entry.DecodeAttributesAndChunks(data); err != nil { glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) @@ -183,3 +184,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat return entries, nil } + +func (store *AbstractSqlStore) Shutdown() { + store.DB.Close() +} diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index f81ef946f..5dd7d8036 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -3,10 +3,13 @@ package cassandra import ( "context" "fmt" + + "github.com/gocql/gocql" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gocql/gocql" ) func init() { @@ -72,7 +75,7 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entr return store.InsertEntry(ctx, entry) } -func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { dir, name := fullpath.DirAndName() var data []byte @@ -80,12 +83,12 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full "SELECT meta FROM filemeta WHERE directory=? AND name=?", dir, name).Consistency(gocql.One).Scan(&data); err != nil { if err != gocql.ErrNotFound { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } } if len(data) == 0 { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } entry = &filer2.Entry{ @@ -99,7 +102,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full return entry, nil } -func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { +func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { dir, name := fullpath.DirAndName() @@ -112,7 +115,7 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu return nil } -func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { +func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { if err := store.session.Query( "DELETE FROM filemeta WHERE directory=?", @@ -123,7 +126,7 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath return nil } -func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" @@ -136,7 +139,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter() for iter.Scan(&name, &data) { entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), name), + FullPath: util.NewFullPath(string(fullpath), name), } if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { err = decodeErr @@ -151,3 +154,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath return entries, err } + +func (store *CassandraStore) Shutdown() { + store.session.Close() +} diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index c901927bb..00b9b132d 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -5,6 +5,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) type Attr struct { @@ -20,6 +21,7 @@ type Attr struct { UserName string GroupNames []string SymlinkTarget string + Md5 []byte } func (attr Attr) IsDirectory() bool { @@ -27,7 +29,7 @@ func (attr Attr) IsDirectory() bool { } type Entry struct { - FullPath + util.FullPath Attr Extended map[string][]byte @@ -71,3 +73,11 @@ func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { Entry: entry.ToProtoEntry(), } } + +func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { + return &Entry{ + FullPath: util.NewFullPath(dir, entry.Name), + Attr: PbToEntryAttribute(entry.Attributes), + Chunks: entry.Chunks, + } +} diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go index 3a2dc6134..47c911011 100644 --- a/weed/filer2/entry_codec.go +++ b/weed/filer2/entry_codec.go @@ -52,6 +52,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { UserName: entry.Attr.UserName, GroupName: entry.Attr.GroupNames, SymlinkTarget: entry.Attr.SymlinkTarget, + Md5: entry.Attr.Md5, } } @@ -71,6 +72,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t.UserName = attr.UserName t.GroupNames = attr.GroupName t.SymlinkTarget = attr.SymlinkTarget + t.Md5 = attr.Md5 return t } @@ -93,6 +95,10 @@ func EqualEntry(a, b *Entry) bool { return false } + if !bytes.Equal(a.Md5, b.Md5) { + return false + } + for i := 0; i < len(a.Chunks); i++ { if !proto.Equal(a.Chunks[i], b.Chunks[i]) { return false diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go index 0f0c01426..2ef65b4a0 100644 --- a/weed/filer2/etcd/etcd_store.go +++ b/weed/filer2/etcd/etcd_store.go @@ -6,10 +6,12 @@ import ( "strings" "time" + "go.etcd.io/etcd/clientv3" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" - "go.etcd.io/etcd/clientv3" ) const ( @@ -90,7 +92,7 @@ func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (e return store.InsertEntry(ctx, entry) } -func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { key := genKey(fullpath.DirAndName()) resp, err := store.client.Get(ctx, string(key)) @@ -99,7 +101,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) } if len(resp.Kvs) == 0 { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } entry = &filer2.Entry{ @@ -113,7 +115,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) return entry, nil } -func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { key := genKey(fullpath.DirAndName()) if _, err := store.client.Delete(ctx, string(key)); err != nil { @@ -123,7 +125,7 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat return nil } -func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { @@ -134,7 +136,7 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer } func (store *EtcdStore) ListDirectoryEntries( - ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int, + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, ) (entries []*filer2.Entry, err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") @@ -157,7 +159,7 @@ func (store *EtcdStore) ListDirectoryEntries( break } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), + FullPath: weed_util.NewFullPath(string(fullpath), fileName), } if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { err = decodeErr @@ -177,7 +179,7 @@ func genKey(dirPath, fileName string) (key []byte) { return key } -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { keyPrefix = []byte(string(fullpath)) keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) if len(startFileName) > 0 { @@ -194,3 +196,7 @@ func getNameFromKey(key []byte) string { return string(key[sepIndex+1:]) } + +func (store *EtcdStore) Shutdown() { + store.client.Close() +} diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index b5876df82..2ddfb3c30 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -3,6 +3,7 @@ package filer2 import ( "fmt" "hash/fnv" + "math" "sort" "sync" @@ -19,7 +20,21 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { return } -func ETag(chunks []*filer_pb.FileChunk) (etag string) { +func ETag(entry *filer_pb.Entry) (etag string) { + if entry.Attributes == nil || entry.Attributes.Md5 == nil { + return ETagChunks(entry.Chunks) + } + return fmt.Sprintf("%x", entry.Attributes.Md5) +} + +func ETagEntry(entry *Entry) (etag string) { + if entry.Attr.Md5 == nil { + return ETagChunks(entry.Chunks) + } + return fmt.Sprintf("%x", entry.Attr.Md5) +} + +func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { if len(chunks) == 1 { return chunks[0].ETag } @@ -70,10 +85,16 @@ type ChunkView struct { Offset int64 Size uint64 LogicOffset int64 - IsFullChunk bool + ChunkSize uint64 + CipherKey []byte + IsGzipped bool } -func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { +func (cv *ChunkView) IsFullChunk() bool { + return cv.Size == cv.ChunkSize +} + +func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { visibles := NonOverlappingVisibleIntervals(chunks) @@ -81,19 +102,27 @@ func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views } -func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int) (views []*ChunkView) { +func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) { - stop := offset + int64(size) + stop := offset + size + if size == math.MaxInt64 { + stop = math.MaxInt64 + } + if stop < offset { + stop = math.MaxInt64 + } for _, chunk := range visibles { + if chunk.start <= offset && offset < chunk.stop && offset < stop { - isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop views = append(views, &ChunkView{ FileId: chunk.fileId, Offset: offset - chunk.start, // offset is the data starting location in this file id Size: uint64(min(chunk.stop, stop) - offset), LogicOffset: offset, - IsFullChunk: isFullChunk, + ChunkSize: chunk.chunkSize, + CipherKey: chunk.cipherKey, + IsGzipped: chunk.isGzipped, }) offset = min(chunk.stop, stop) } @@ -120,13 +149,7 @@ var bufPool = sync.Pool{ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { - newV := newVisibleInterval( - chunk.Offset, - chunk.Offset+int64(chunk.Size), - chunk.GetFileIdString(), - chunk.Mtime, - true, - ) + newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsGzipped) length := len(visibles) if length == 0 { @@ -140,23 +163,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. logPrintf(" before", visibles) for _, v := range visibles { if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval( - v.start, - chunk.Offset, - v.fileId, - v.modifiedTime, - false, - )) + newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) } chunkStop := chunk.Offset + int64(chunk.Size) if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false, - )) + newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) } if chunkStop <= v.start || v.stop <= chunk.Offset { newVisibles = append(newVisibles, v) @@ -187,6 +198,7 @@ func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []Vi var newVisibles []VisibleInterval for _, chunk := range chunks { + newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk) t := visibles[:0] visibles = newVisibles @@ -207,16 +219,20 @@ type VisibleInterval struct { stop int64 modifiedTime int64 fileId string - isFullChunk bool + chunkSize uint64 + cipherKey []byte + isGzipped bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) VisibleInterval { +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { return VisibleInterval{ start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime, - isFullChunk: isFullChunk, + chunkSize: chunkSize, + cipherKey: cipherKey, + isGzipped: isGzipped, } } diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index bb4a6c74d..7b1133b85 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -218,7 +218,7 @@ func TestChunksReading(t *testing.T) { testcases := []struct { Chunks []*filer_pb.FileChunk Offset int64 - Size int + Size int64 Expected []*ChunkView }{ // case 0: normal diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index a0af942e0..666ab8fe4 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "path/filepath" "strings" "time" @@ -13,6 +12,9 @@ import ( "github.com/karlseguin/ccache" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -24,20 +26,30 @@ var ( ) type Filer struct { - store *FilerStoreWrapper - directoryCache *ccache.Cache - MasterClient *wdclient.MasterClient - fileIdDeletionChan chan string - GrpcDialOption grpc.DialOption + store *FilerStoreWrapper + directoryCache *ccache.Cache + MasterClient *wdclient.MasterClient + fileIdDeletionQueue *util.UnboundedQueue + GrpcDialOption grpc.DialOption + DirBucketsPath string + FsyncBuckets []string + buckets *FilerBuckets + Cipher bool + MetaLogBuffer *log_buffer.LogBuffer + metaLogCollection string + metaLogReplication string } -func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer { +func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { f := &Filer{ - directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), - MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters), - fileIdDeletionChan: make(chan string, PaginationSize), - GrpcDialOption: grpcDialOption, + directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), + fileIdDeletionQueue: util.NewUnboundedQueue(), + GrpcDialOption: grpcDialOption, } + f.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) + f.metaLogCollection = collection + f.metaLogReplication = replication go f.loopProcessingDeletion() @@ -85,7 +97,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro var lastDirectoryEntry *Entry for i := 1; i < len(dirParts); i++ { - dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...)) + dirPath := "/" + util.Join(dirParts[:i]...) // fmt.Printf("%d directory: %+v\n", i, dirPath) // first check local cache @@ -94,7 +106,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro // not found, check the store directly if dirEntry == nil { glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath)) + dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath)) } else { // glog.V(4).Infof("found cached directory: %s", dirPath) } @@ -106,24 +118,29 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro now := time.Now() dirEntry = &Entry{ - FullPath: FullPath(dirPath), + FullPath: util.FullPath(dirPath), Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | 0770, - Uid: entry.Uid, - Gid: entry.Gid, + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0110, + Uid: entry.Uid, + Gid: entry.Gid, + Collection: entry.Collection, + Replication: entry.Replication, + UserName: entry.UserName, + GroupNames: entry.GroupNames, }, } glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) mkdirErr := f.store.InsertEntry(ctx, dirEntry) if mkdirErr != nil { - if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound { + if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } } else { + f.maybeAddBucket(dirEntry) f.NotifyUpdateEvent(nil, dirEntry, false) } @@ -174,6 +191,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro } } + f.maybeAddBucket(entry) f.NotifyUpdateEvent(oldEntry, entry, true) f.deleteChunksIfNotNew(oldEntry, entry) @@ -197,7 +215,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er return f.store.UpdateEntry(ctx, entry) } -func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) { +func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) { now := time.Now() @@ -213,14 +231,51 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er }, }, nil } - return f.store.FindEntry(ctx, p) + entry, err = f.store.FindEntry(ctx, p) + if entry != nil && entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + return nil, filer_pb.ErrNotFound + } + } + return + } -func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { +func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } - return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + + var makeupEntries []*Entry + entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + for expiredCount > 0 && err == nil { + makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount) + if err == nil { + entries = append(entries, makeupEntries...) + } + } + + return entries, err +} + +func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) { + listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + if listErr != nil { + return listedEntries, expiredCount, "", listErr + } + for _, entry := range listedEntries { + lastFileName = entry.Name() + if entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + expiredCount++ + continue + } + } + entries = append(entries, entry) + } + return } func (f *Filer) cacheDelDirectory(dirpath string) { @@ -261,3 +316,8 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute) } + +func (f *Filer) Shutdown() { + f.MetaLogBuffer.Shutdown() + f.store.Shutdown() +} diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go new file mode 100644 index 000000000..7a57e7ee1 --- /dev/null +++ b/weed/filer2/filer_buckets.go @@ -0,0 +1,121 @@ +package filer2 + +import ( + "context" + "math" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type BucketName string +type BucketOption struct { + Name BucketName + Replication string + fsync bool +} +type FilerBuckets struct { + dirBucketsPath string + buckets map[BucketName]*BucketOption + sync.RWMutex +} + +func (f *Filer) LoadBuckets() { + + f.buckets = &FilerBuckets{ + buckets: make(map[BucketName]*BucketOption), + } + + limit := math.MaxInt32 + + entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit) + + if err != nil { + glog.V(1).Infof("no buckets found: %v", err) + return + } + + shouldFsyncMap := make(map[string]bool) + for _, bucket := range f.FsyncBuckets { + shouldFsyncMap[bucket] = true + } + + glog.V(1).Infof("buckets found: %d", len(entries)) + + f.buckets.Lock() + for _, entry := range entries { + _, shouldFsnyc := shouldFsyncMap[entry.Name()] + f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ + Name: BucketName(entry.Name()), + Replication: entry.Replication, + fsync: shouldFsnyc, + } + } + f.buckets.Unlock() + +} + +func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) { + + f.buckets.RLock() + defer f.buckets.RUnlock() + + option, found := f.buckets.buckets[BucketName(buketName)] + + if !found { + return "", false + } + return option.Replication, option.fsync + +} + +func (f *Filer) isBucket(entry *Entry) bool { + if !entry.IsDirectory() { + return false + } + parent, dirName := entry.FullPath.DirAndName() + if parent != f.DirBucketsPath { + return false + } + + f.buckets.RLock() + defer f.buckets.RUnlock() + + _, found := f.buckets.buckets[BucketName(dirName)] + + return found + +} + +func (f *Filer) maybeAddBucket(entry *Entry) { + if !entry.IsDirectory() { + return + } + parent, dirName := entry.FullPath.DirAndName() + if parent != f.DirBucketsPath { + return + } + f.addBucket(dirName, &BucketOption{ + Name: BucketName(dirName), + Replication: entry.Replication, + }) +} + +func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) { + + f.buckets.Lock() + defer f.buckets.Unlock() + + f.buckets.buckets[BucketName(buketName)] = bucketOption + +} + +func (f *Filer) deleteBucket(buketName string) { + + f.buckets.Lock() + defer f.buckets.Unlock() + + delete(f.buckets.buckets, BucketName(buketName)) + +} diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go deleted file mode 100644 index af804b909..000000000 --- a/weed/filer2/filer_client_util.go +++ /dev/null @@ -1,172 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "io" - "math" - "strings" - "sync" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func VolumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} - -type FilerClient interface { - WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error -} - -func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { - var vids []string - for _, chunkView := range chunkViews { - vids = append(vids, VolumeId(chunkView.FileId)) - } - - vid2Locations := make(map[string]*filer_pb.Locations) - - err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return err - } - - vid2Locations = resp.LocationsMap - - return nil - }) - - if err != nil { - return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) - } - - var wg sync.WaitGroup - for _, chunkView := range chunkViews { - wg.Add(1) - go func(chunkView *ChunkView) { - defer wg.Done() - - glog.V(4).Infof("read fh reading chunk: %+v", chunkView) - - locations := vid2Locations[VolumeId(chunkView.FileId)] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", chunkView.FileId) - err = fmt.Errorf("failed to locate %s", chunkView.FileId) - return - } - - var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], - !chunkView.IsFullChunk) - - if err != nil { - - glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) - - err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) - return - } - - glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) - totalRead += n - - }(chunkView) - } - wg.Wait() - return -} - -func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) { - - dir, name := fullFilePath.DirAndName() - - err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir, - Name: name, - } - - // glog.V(3).Infof("read %s request: %v", fullFilePath, request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) { - return nil - } - glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err) - return err - } - - if resp.Entry == nil { - // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry) - return nil - } - - entry = resp.Entry - return nil - }) - - return -} - -func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) { - - err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - lastEntryName := "" - - request := &filer_pb.ListEntriesRequest{ - Directory: string(fullDirPath), - Prefix: prefix, - StartFromFileName: lastEntryName, - Limit: math.MaxUint32, - } - - glog.V(3).Infof("read directory: %v", request) - stream, err := client.ListEntries(ctx, request) - if err != nil { - return fmt.Errorf("list %s: %v", fullDirPath, err) - } - - var prevEntry *filer_pb.Entry - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - if prevEntry != nil { - fn(prevEntry, true) - } - break - } else { - return recvErr - } - } - if prevEntry != nil { - fn(prevEntry, false) - } - prevEntry = resp.Entry - } - - return nil - - }) - - return -} diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go index 75a09e7ef..2fb53c579 100644 --- a/weed/filer2/filer_delete_entry.go +++ b/weed/filer2/filer_delete_entry.go @@ -6,9 +6,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) -func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { +func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { if p == "/" { return nil } @@ -18,27 +20,35 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs return findErr } + isCollection := f.isBucket(entry) + var chunks []*filer_pb.FileChunk chunks = append(chunks, entry.Chunks...) if entry.IsDirectory() { // delete the folder children, not including the folder itself var dirChunks []*filer_pb.FileChunk - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks) + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection) if err != nil { + glog.V(0).Infof("delete directory %s: %v", p, err) return fmt.Errorf("delete directory %s: %v", p, err) } chunks = append(chunks, dirChunks...) - f.cacheDelDirectory(string(p)) } + // delete the file or folder err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks) if err != nil { return fmt.Errorf("delete file %s: %v", p, err) } - if shouldDeleteChunks { + if shouldDeleteChunks && !isCollection { go f.DeleteChunks(chunks) } + if isCollection { + collectionName := entry.Name() + f.doDeleteCollection(collectionName) + f.deleteBucket(collectionName) + } return nil } @@ -63,13 +73,15 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry var dirChunks []*filer_pb.FileChunk if sub.IsDirectory() { dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks) + f.cacheDelDirectory(string(sub.FullPath)) + f.NotifyUpdateEvent(sub, nil, shouldDeleteChunks) + chunks = append(chunks, dirChunks...) + } else { + chunks = append(chunks, sub.Chunks...) } if err != nil && !ignoreRecursiveError { return nil, err } - if shouldDeleteChunks { - chunks = append(chunks, dirChunks...) - } } if len(entries) < PaginationSize { @@ -77,26 +89,40 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry } } - f.cacheDelDirectory(string(entry.FullPath)) - - glog.V(3).Infof("deleting directory %v", entry.FullPath) + glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) } - f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) return chunks, nil } func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) { - glog.V(3).Infof("deleting entry %v", entry.FullPath) + glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { return fmt.Errorf("filer store delete: %v", storeDeletionErr) } + if entry.IsDirectory() { + f.cacheDelDirectory(string(entry.FullPath)) + } f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) return nil } + +func (f *Filer) doDeleteCollection(collectionName string) (err error) { + + return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ + Name: collectionName, + }) + if err != nil { + glog.Infof("delete collection %s: %v", collectionName, err) + } + return err + }) + +} diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 9937685f7..a6b229771 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -6,16 +6,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" ) -func (f *Filer) loopProcessingDeletion() { - - ticker := time.NewTicker(5 * time.Second) - - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { +func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error) { + return func(vids []string) (map[string]operation.LookupResult, error) { m := make(map[string]operation.LookupResult) for _, vid := range vids { - locs, _ := f.MasterClient.GetVidLocations(vid) + locs, _ := masterClient.GetVidLocations(vid) var locations []operation.Location for _, loc := range locs { locations = append(locations, operation.Location{ @@ -30,37 +28,56 @@ func (f *Filer) loopProcessingDeletion() { } return m, nil } +} + +func (f *Filer) loopProcessingDeletion() { + + lookupFunc := LookupByMasterClientFn(f.MasterClient) + + DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. - var fileIds []string + var deletionCount int for { - select { - case fid := <-f.fileIdDeletionChan: - fileIds = append(fileIds, fid) - if len(fileIds) >= 4096 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] + deletionCount = 0 + f.fileIdDeletionQueue.Consume(func(fileIds []string) { + for len(fileIds) > 0 { + var toDeleteFileIds []string + if len(fileIds) > DeletionBatchSize { + toDeleteFileIds = fileIds[:DeletionBatchSize] + fileIds = fileIds[DeletionBatchSize:] + } else { + toDeleteFileIds = fileIds + fileIds = fileIds[:0] + } + deletionCount = len(toDeleteFileIds) + deleteResults, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) + if err != nil { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } else { + glog.V(1).Infof("deleting fileIds len=%d", deletionCount) + } + if len(deleteResults) != deletionCount { + glog.V(0).Infof("delete %d fileIds actual %d", deletionCount, len(deleteResults)) + } } + }) + + if deletionCount == 0 { + time.Sleep(1123 * time.Millisecond) } } } func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - f.fileIdDeletionChan <- chunk.GetFileIdString() + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) } } // DeleteFileByFileId direct delete by file id. // Only used when the fileId is not being managed by snapshots. func (f *Filer) DeleteFileByFileId(fileId string) { - f.fileIdDeletionChan <- fileId + f.fileIdDeletionQueue.EnQueue(fileId) } func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index c37381116..ecb488373 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -1,39 +1,157 @@ package filer2 import ( + "context" + "fmt" + "io" + "strings" + "time" + + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) { - var key string + var fullpath string if oldEntry != nil { - key = string(oldEntry.FullPath) + fullpath = string(oldEntry.FullPath) } else if newEntry != nil { - key = string(newEntry.FullPath) + fullpath = string(newEntry.FullPath) } else { return } + // println("fullpath:", fullpath) + + if strings.HasPrefix(fullpath, SystemLogDir) { + return + } + + newParentPath := "" + if newEntry != nil { + newParentPath, _ = newEntry.FullPath.DirAndName() + } + eventNotification := &filer_pb.EventNotification{ + OldEntry: oldEntry.ToProtoEntry(), + NewEntry: newEntry.ToProtoEntry(), + DeleteChunks: deleteChunks, + NewParentPath: newParentPath, + } + if notification.Queue != nil { + glog.V(3).Infof("notifying entry update %v", fullpath) + notification.Queue.SendMessage(fullpath, eventNotification) + } + + f.logMetaEvent(fullpath, eventNotification) + +} + +func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) { + + dir, _ := util.FullPath(fullpath).DirAndName() + + event := &filer_pb.SubscribeMetadataResponse{ + Directory: dir, + EventNotification: eventNotification, + TsNs: time.Now().UnixNano(), + } + data, err := proto.Marshal(event) + if err != nil { + glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err) + return + } + + f.MetaLogBuffer.AddToBuffer([]byte(dir), data) - glog.V(3).Infof("notifying entry update %v", key) +} + +func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { + + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + // startTime.Second(), startTime.Nanosecond(), + ) + + if err := f.appendToFile(targetFile, buf); err != nil { + glog.V(0).Infof("log write failed %s: %v", targetFile, err) + } +} + +func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { + + startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) + startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) - newParentPath := "" - if newEntry != nil { - newParentPath, _ = newEntry.FullPath.DirAndName() + sizeBuf := make([]byte, 4) + startTsNs := startTime.UnixNano() + + dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366) + if listDayErr != nil { + return fmt.Errorf("fail to list log by day: %v", listDayErr) + } + for _, dayEntry := range dayEntries { + // println("checking day", dayEntry.FullPath) + hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60) + if listHourMinuteErr != nil { + return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } + for _, hourMinuteEntry := range hourMinuteEntries { + // println("checking hh-mm", hourMinuteEntry.FullPath) + if dayEntry.Name() == startDate { + if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 { + continue + } + } + // println("processing", hourMinuteEntry.FullPath) + chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks) + if err := ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + chunkedFileReader.Close() + if err == io.EOF { + break + } + return fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) + } + chunkedFileReader.Close() + } + } - notification.Queue.SendMessage( - key, - &filer_pb.EventNotification{ - OldEntry: oldEntry.ToProtoEntry(), - NewEntry: newEntry.ToProtoEntry(), - DeleteChunks: deleteChunks, - NewParentPath: newParentPath, - }, - ) + return nil +} +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { + for { + n, err := r.Read(sizeBuf) + if err != nil { + return err + } + if n != 4 { + return fmt.Errorf("size %d bytes, expected 4 bytes", n) + } + size := util.BytesToUint32(sizeBuf) + // println("entry size", size) + entryData := make([]byte, size) + n, err = r.Read(entryData) + if err != nil { + return err + } + if n != int(size) { + return fmt.Errorf("entry data %d bytes, expected %d bytes", n, size) + } + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + return err + } + if logEntry.TsNs <= ns { + return nil + } + // println("each log: ", logEntry.TsNs) + if err := eachLogEntryFn(logEntry); err != nil { + return err + } } } diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go new file mode 100644 index 000000000..af291058c --- /dev/null +++ b/weed/filer2/filer_notify_append.go @@ -0,0 +1,73 @@ +package filer2 + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (f *Filer) appendToFile(targetFile string, data []byte) error { + + assignResult, uploadResult, err2 := f.assignAndUpload(data) + if err2 != nil { + return err2 + } + + // find out existing entry + fullpath := util.FullPath(targetFile) + entry, err := f.FindEntry(context.Background(), fullpath) + var offset int64 = 0 + if err == filer_pb.ErrNotFound { + entry = &Entry{ + FullPath: fullpath, + Attr: Attr{ + Crtime: time.Now(), + Mtime: time.Now(), + Mode: os.FileMode(0644), + Uid: OS_UID, + Gid: OS_GID, + }, + } + } else { + offset = int64(TotalSize(entry.Chunks)) + } + + // append to existing chunks + entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) + + // update the entry + err = f.CreateEntry(context.Background(), entry, false) + + return err +} + +func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) { + // assign a volume location + assignRequest := &operation.VolumeAssignRequest{ + Count: 1, + Collection: f.metaLogCollection, + Replication: f.metaLogReplication, + WritableVolumeCount: 1, + } + assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) + if err != nil { + return nil, nil, fmt.Errorf("AssignVolume: %v", err) + } + if assignResult.Error != "" { + return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error) + } + + // upload data + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth) + if err != nil { + return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + // println("uploaded to", targetUrl) + return assignResult, uploadResult, nil +} diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go index b74e2ad35..29170bfdf 100644 --- a/weed/filer2/filer_notify_test.go +++ b/weed/filer2/filer_notify_test.go @@ -5,13 +5,15 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" ) func TestProtoMarshalText(t *testing.T) { oldEntry := &Entry{ - FullPath: FullPath("/this/path/to"), + FullPath: util.FullPath("/this/path/to"), Attr: Attr{ Mtime: time.Now(), Mode: 0644, diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index ae25534ed..f36c74f14 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -2,7 +2,6 @@ package filer2 import ( "context" - "errors" "time" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -18,17 +17,17 @@ type FilerStore interface { InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) // err == filer2.ErrNotFound if not found - FindEntry(context.Context, FullPath) (entry *Entry, err error) - DeleteEntry(context.Context, FullPath) (err error) - DeleteFolderChildren(context.Context, FullPath) (err error) - ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) + FindEntry(context.Context, util.FullPath) (entry *Entry, err error) + DeleteEntry(context.Context, util.FullPath) (err error) + DeleteFolderChildren(context.Context, util.FullPath) (err error) + ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) BeginTransaction(ctx context.Context) (context.Context, error) CommitTransaction(ctx context.Context) error RollbackTransaction(ctx context.Context) error -} -var ErrNotFound = errors.New("filer: no entry is found in filer store") + Shutdown() +} type FilerStoreWrapper struct { actualStore FilerStore @@ -73,7 +72,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err return fsw.actualStore.UpdateEntry(ctx, entry) } -func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) { +func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc() start := time.Now() defer func() { @@ -88,7 +87,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry return } -func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) { +func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc() start := time.Now() defer func() { @@ -98,7 +97,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err return fsw.actualStore.DeleteEntry(ctx, fp) } -func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) { +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc() start := time.Now() defer func() { @@ -108,7 +107,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullP return fsw.actualStore.DeleteFolderChildren(ctx, fp) } -func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { +func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc() start := time.Now() defer func() { @@ -136,3 +135,7 @@ func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { return fsw.actualStore.RollbackTransaction(ctx) } + +func (fsw *FilerStoreWrapper) Shutdown() { + fsw.actualStore.Shutdown() +} diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go deleted file mode 100644 index 133069f93..000000000 --- a/weed/filer2/fullpath.go +++ /dev/null @@ -1,42 +0,0 @@ -package filer2 - -import ( - "path/filepath" - "strings" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type FullPath string - -func NewFullPath(dir, name string) FullPath { - return FullPath(dir).Child(name) -} - -func (fp FullPath) DirAndName() (string, string) { - dir, name := filepath.Split(string(fp)) - if dir == "/" { - return dir, name - } - if len(dir) < 1 { - return "/", "" - } - return dir[:len(dir)-1], name -} - -func (fp FullPath) Name() string { - _, name := filepath.Split(string(fp)) - return name -} - -func (fp FullPath) Child(name string) FullPath { - dir := string(fp) - if strings.HasSuffix(dir, "/") { - return FullPath(dir + name) - } - return FullPath(dir + "/" + name) -} - -func (fp FullPath) AsInode() uint64 { - return uint64(util.HashStringToLong(string(fp))) -} diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index 44e6ac0eb..31919ca49 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -6,11 +6,13 @@ import ( "fmt" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" ) @@ -48,8 +50,13 @@ func (store *LevelDBStore) initialize(dir string) (err error) { } if store.db, err = leveldb.OpenFile(dir, opts); err != nil { - glog.Infof("filer store open dir %s: %v", dir, err) - return + if errors.IsCorrupted(err) { + store.db, err = leveldb.RecoverFile(dir, opts) + } + if err != nil { + glog.Infof("filer store open dir %s: %v", dir, err) + return + } } return } @@ -88,13 +95,13 @@ func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) return store.InsertEntry(ctx, entry) } -func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { key := genKey(fullpath.DirAndName()) data, err := store.db.Get(key, nil) if err == leveldb.ErrNotFound { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } if err != nil { return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) @@ -113,7 +120,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPa return entry, nil } -func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { key := genKey(fullpath.DirAndName()) err = store.db.Delete(key, nil) @@ -124,7 +131,7 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full return nil } -func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { batch := new(leveldb.Batch) @@ -152,7 +159,7 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath fi return nil } -func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") @@ -175,7 +182,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath fi break } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), + FullPath: weed_util.NewFullPath(string(fullpath), fileName), } if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { err = decodeErr @@ -196,7 +203,7 @@ func genKey(dirPath, fileName string) (key []byte) { return key } -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { keyPrefix = []byte(string(fullpath)) keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) if len(startFileName) > 0 { @@ -215,3 +222,7 @@ func getNameFromKey(key []byte) string { return string(key[sepIndex+1:]) } + +func (store *LevelDBStore) Shutdown() { + store.db.Close() +} diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 983e1cbe9..1daa47c97 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -2,14 +2,16 @@ package leveldb import ( "context" - "github.com/chrislusf/seaweedfs/weed/filer2" "io/ioutil" "os" "testing" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDBStore{} @@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) { filer.SetStore(store) filer.DisableDirectoryCache() - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") + fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") ctx := context.Background() @@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -64,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDBStore{} @@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go index 358d4d92a..c907e8746 100644 --- a/weed/filer2/leveldb2/leveldb2_store.go +++ b/weed/filer2/leveldb2/leveldb2_store.go @@ -9,11 +9,13 @@ import ( "os" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" ) @@ -51,9 +53,12 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { dbFolder := fmt.Sprintf("%s/%02d", dir, d) os.MkdirAll(dbFolder, 0755) db, dbErr := leveldb.OpenFile(dbFolder, opts) + if errors.IsCorrupted(dbErr) { + db, dbErr = leveldb.RecoverFile(dbFolder, opts) + } if dbErr != nil { glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr) - return + return dbErr } store.dbs = append(store.dbs, db) } @@ -97,14 +102,14 @@ func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry return store.InsertEntry(ctx, entry) } -func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { dir, name := fullpath.DirAndName() key, partitionId := genKey(dir, name, store.dbCount) data, err := store.dbs[partitionId].Get(key, nil) if err == leveldb.ErrNotFound { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } if err != nil { return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) @@ -123,7 +128,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullP return entry, nil } -func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { dir, name := fullpath.DirAndName() key, partitionId := genKey(dir, name, store.dbCount) @@ -135,7 +140,7 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful return nil } -func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) batch := new(leveldb.Batch) @@ -163,7 +168,7 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath f return nil } -func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) @@ -187,7 +192,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath f break } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), + FullPath: weed_util.NewFullPath(string(fullpath), fileName), } // println("list", entry.FullPath, "chunks", len(entry.Chunks)) @@ -210,7 +215,7 @@ func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) return key, partitionId } -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount) if len(startFileName) > 0 { keyPrefix = append(keyPrefix, []byte(startFileName)...) @@ -235,3 +240,9 @@ func hashToBytes(dir string, dbCount int) ([]byte, int) { return b, int(x) % dbCount } + +func (store *LevelDB2Store) Shutdown() { + for d := 0; d < store.dbCount; d++ { + store.dbs[d].Close() + } +} diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index 58637b7b6..9ad168233 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -2,14 +2,16 @@ package leveldb import ( "context" - "github.com/chrislusf/seaweedfs/weed/filer2" "io/ioutil" "os" "testing" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDB2Store{} @@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) { filer.SetStore(store) filer.DisableDirectoryCache() - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") + fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") ctx := context.Background() @@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -64,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil) + filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDB2Store{} @@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer2/mongodb/mongodb_store.go b/weed/filer2/mongodb/mongodb_store.go new file mode 100644 index 000000000..375a457a4 --- /dev/null +++ b/weed/filer2/mongodb/mongodb_store.go @@ -0,0 +1,210 @@ +package mongodb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx" + "time" +) + +func init() { + filer2.Stores = append(filer2.Stores, &MongodbStore{}) +} + +type MongodbStore struct { + connect *mongo.Client + database string + collectionName string +} + +type Model struct { + Directory string `bson:"directory"` + Name string `bson:"name"` + Meta []byte `bson:"meta"` +} + +func (store *MongodbStore) GetName() string { + return "mongodb" +} + +func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + store.database = configuration.GetString(prefix + "database") + store.collectionName = "filemeta" + poolSize := configuration.GetInt(prefix + "option_pool_size") + return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize)) +} + +func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + opts := options.Client().ApplyURI(uri) + + if poolSize > 0 { + opts.SetMaxPoolSize(poolSize) + } + + client, err := mongo.Connect(ctx, opts) + if err != nil { + return err + } + + c := client.Database(store.database).Collection(store.collectionName) + err = store.indexUnique(c) + store.connect = client + return err +} + +func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error { + _, err := c.Indexes().CreateOne(context.Background(), index, opts) + return err +} + +func (store *MongodbStore) indexUnique(c *mongo.Collection) error { + opts := options.CreateIndexes().SetMaxTime(10 * time.Second) + + unique := new(bool) + *unique = true + + index := mongo.IndexModel{ + Keys: bsonx.Doc{{Key: "directory", Value: bsonx.Int32(1)}, {Key: "name", Value: bsonx.Int32(1)}}, + Options: &options.IndexOptions{ + Unique: unique, + }, + } + + return store.createIndex(c, index, opts) +} + +func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (store *MongodbStore) CommitTransaction(ctx context.Context) error { + return nil +} + +func (store *MongodbStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { + + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + c := store.connect.Database(store.database).Collection(store.collectionName) + + _, err = c.InsertOne(ctx, Model{ + Directory: dir, + Name: name, + Meta: meta, + }) + + return nil +} + +func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { + + dir, name := fullpath.DirAndName() + var data Model + + var where = bson.M{"directory": dir, "name": name} + err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data) + if err != mongo.ErrNoDocuments && err != nil { + return nil, filer_pb.ErrNotFound + } + + if len(data.Meta) == 0 { + return nil, filer_pb.ErrNotFound + } + + entry = &filer2.Entry{ + FullPath: fullpath, + } + + err = entry.DecodeAttributesAndChunks(data.Meta) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { + + dir, name := fullpath.DirAndName() + + where := bson.M{"directory": dir, "name": name} + _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + + where := bson.M{"directory": fullpath} + _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { + + var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}} + if inclusive { + where["name"] = bson.M{ + "$gte": startFileName, + } + } + optLimit := int64(limit) + opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}} + cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts) + for cur.Next(ctx) { + var data Model + err := cur.Decode(&data) + if err != nil && err != mongo.ErrNoDocuments { + return nil, err + } + + entry := &filer2.Entry{ + FullPath: util.NewFullPath(string(fullpath), data.Name), + } + if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + + entries = append(entries, entry) + } + + if err := cur.Close(ctx); err != nil { + glog.V(0).Infof("list iterator close: %v", err) + } + + return entries, err +} + +func (store *MongodbStore) Shutdown() { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + store.connect.Disconnect(ctx) +} diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go index 27a0c2513..51c069aae 100644 --- a/weed/filer2/postgres/postgres_store.go +++ b/weed/filer2/postgres/postgres_store.go @@ -11,7 +11,7 @@ import ( ) const ( - CONNECTION_URL_PATTERN = "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30" + CONNECTION_URL_PATTERN = "host=%s port=%d user=%s sslmode=%s connect_timeout=30" ) func init() { @@ -49,7 +49,13 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" - sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, password, database, sslmode) + sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode) + if password != "" { + sqlUrl += " password=" + password + } + if database != "" { + sqlUrl += " dbname=" + database + } var dbErr error store.DB, dbErr = sql.Open("postgres", sqlUrl) if dbErr != nil { diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go new file mode 100644 index 000000000..53fff7672 --- /dev/null +++ b/weed/filer2/reader_at.go @@ -0,0 +1,156 @@ +package filer2 + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +type ChunkReadAt struct { + masterClient *wdclient.MasterClient + chunkViews []*ChunkView + buffer []byte + bufferOffset int64 + lookupFileId func(fileId string) (targetUrl string, err error) + readerLock sync.Mutex + + chunkCache *chunk_cache.ChunkCache +} + +// var _ = io.ReaderAt(&ChunkReadAt{}) + +type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) + +func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { + return func(fileId string) (targetUrl string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + vid := VolumeId(fileId) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations := resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + + targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + + return nil + }) + return + } +} + +func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt { + + return &ChunkReadAt{ + chunkViews: chunkViews, + lookupFileId: LookupFn(filerClient), + bufferOffset: -1, + chunkCache: chunkCache, + } +} + +func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { + + c.readerLock.Lock() + defer c.readerLock.Unlock() + + for n < len(p) && err == nil { + readCount, readErr := c.doReadAt(p[n:], offset+int64(n)) + n += readCount + err = readErr + if readCount == 0 { + return n, nil + } + } + return +} + +func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { + + var found bool + for _, chunk := range c.chunkViews { + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + found = true + if c.bufferOffset != chunk.LogicOffset { + c.buffer, err = c.fetchChunkData(chunk) + c.bufferOffset = chunk.LogicOffset + } + break + } + } + if !found { + return 0, io.EOF + } + + n = copy(p, c.buffer[offset-c.bufferOffset:]) + + // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer))) + + return + +} + +func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err error) { + + // fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + + hasDataInCache := false + chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + if chunkData != nil { + glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + hasDataInCache = true + } else { + chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) + if err != nil { + return nil, err + } + } + + if int64(len(chunkData)) < chunkView.Offset+int64(chunkView.Size) { + return nil, fmt.Errorf("unexpected larger chunkView [%d,%d) than chunk %d", chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData)) + } + + data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)] + + if !hasDataInCache { + c.chunkCache.SetChunk(chunkView.FileId, chunkData) + } + + return data, nil +} + +func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { + + urlString, err := c.lookupFileId(fileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + var buffer bytes.Buffer + err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", fileId, err) + return nil, err + } + + return buffer.Bytes(), nil +} diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go index 62257e91e..e5b9e8840 100644 --- a/weed/filer2/redis/universal_redis_store.go +++ b/weed/filer2/redis/universal_redis_store.go @@ -3,12 +3,16 @@ package redis import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/go-redis/redis" "sort" "strings" "time" + + "github.com/go-redis/redis" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) const ( @@ -58,11 +62,11 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2 return store.InsertEntry(ctx, entry) } -func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { data, err := store.Client.Get(string(fullpath)).Result() if err == redis.Nil { - return nil, filer2.ErrNotFound + return nil, filer_pb.ErrNotFound } if err != nil { @@ -80,7 +84,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2 return entry, nil } -func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { _, err = store.Client.Del(string(fullpath)).Result() @@ -99,7 +103,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file return nil } -func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() if err != nil { @@ -107,7 +111,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full } for _, fileName := range members { - path := filer2.NewFullPath(string(fullpath), fileName) + path := util.NewFullPath(string(fullpath), fileName) _, err = store.Client.Del(string(path)).Result() if err != nil { return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) @@ -117,10 +121,11 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full return nil } -func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() + dirListKey := genDirectoryListKey(string(fullpath)) + members, err := store.Client.SMembers(dirListKey).Result() if err != nil { return nil, fmt.Errorf("list %s : %v", fullpath, err) } @@ -154,11 +159,18 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full // fetch entry meta for _, fileName := range members { - path := filer2.NewFullPath(string(fullpath), fileName) + path := util.NewFullPath(string(fullpath), fileName) entry, err := store.FindEntry(ctx, path) if err != nil { glog.V(0).Infof("list %s : %v", path, err) } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(string(path)).Result() + store.Client.SRem(dirListKey, fileName).Result() + continue + } + } entries = append(entries, entry) } } @@ -169,3 +181,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full func genDirectoryListKey(dir string) (dirList string) { return dir + DIR_LIST_MARKER } + +func (store *UniversalRedisStore) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer2/redis2/redis_cluster_store.go b/weed/filer2/redis2/redis_cluster_store.go new file mode 100644 index 000000000..b252eabab --- /dev/null +++ b/weed/filer2/redis2/redis_cluster_store.go @@ -0,0 +1,42 @@ +package redis2 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer2.Stores = append(filer2.Stores, &RedisCluster2Store{}) +} + +type RedisCluster2Store struct { + UniversalRedis2Store +} + +func (store *RedisCluster2Store) GetName() string { + return "redis_cluster2" +} + +func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", true) + configuration.SetDefault(prefix+"routeByLatency", true) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + ) +} + +func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + return +} diff --git a/weed/filer2/redis2/redis_store.go b/weed/filer2/redis2/redis_store.go new file mode 100644 index 000000000..1e2a20043 --- /dev/null +++ b/weed/filer2/redis2/redis_store.go @@ -0,0 +1,36 @@ +package redis2 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer2.Stores = append(filer2.Stores, &Redis2Store{}) +} + +type Redis2Store struct { + UniversalRedis2Store +} + +func (store *Redis2Store) GetName() string { + return "redis2" +} + +func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + return +} diff --git a/weed/filer2/redis2/universal_redis_store.go b/weed/filer2/redis2/universal_redis_store.go new file mode 100644 index 000000000..420336b46 --- /dev/null +++ b/weed/filer2/redis2/universal_redis_store.go @@ -0,0 +1,162 @@ +package redis2 + +import ( + "context" + "fmt" + "time" + + "github.com/go-redis/redis" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedis2Store struct { + Client redis.UniversalClient +} + +func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + dir, name := entry.FullPath.DirAndName() + if name != "" { + if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { + + data, err := store.Client.Get(string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer2.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks([]byte(data)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + _, err = store.Client.Del(string(fullpath)).Result() + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + dir, name := fullpath.DirAndName() + if name != "" { + _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() + if err != nil { + return fmt.Errorf("delete folder %s : %v", fullpath, err) + } + + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + _, err = store.Client.Del(string(path)).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer2.Entry, err error) { + + dirListKey := genDirectoryListKey(string(fullpath)) + start := int64(0) + if startFileName != "" { + start, _ = store.Client.ZRank(dirListKey, startFileName).Result() + if !inclusive { + start++ + } + } + members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result() + if err != nil { + return nil, fmt.Errorf("list %s : %v", fullpath, err) + } + + // fetch entry meta + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + entry, err := store.FindEntry(ctx, path) + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(string(path)).Result() + store.Client.ZRem(dirListKey, fileName).Result() + continue + } + } + entries = append(entries, entry) + } + } + + return entries, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedis2Store) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 01b87cad1..033a8dd13 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -1,7 +1,10 @@ package filer2 import ( + "bytes" "io" + "math" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -9,7 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) -func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error { +func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { chunkViews := ViewFromChunks(chunks, offset, size) @@ -26,8 +29,9 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f } for _, chunkView := range chunkViews { + urlString := fileId2Url[chunkView.FileId] - _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { + err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { w.Write(data) }) if err != nil { @@ -39,3 +43,157 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f return nil } + +// ---------------- ReadAllReader ---------------------------------- + +func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { + + buffer := bytes.Buffer{} + + chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + + lookupFileId := func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + } + + for _, chunkView := range chunkViews { + urlString, err := lookupFileId(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return nil, err + } + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return nil, err + } + } + return buffer.Bytes(), nil +} + +// ---------------- ChunkStreamReader ---------------------------------- +type ChunkStreamReader struct { + chunkViews []*ChunkView + logicOffset int64 + buffer []byte + bufferOffset int64 + bufferPos int + chunkIndex int + lookupFileId LookupFileIdFunctionType +} + +var _ = io.ReadSeeker(&ChunkStreamReader{}) + +func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + }, + } +} + +func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: LookupFn(filerClient), + } +} + +func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { + for n < len(p) { + if c.isBufferEmpty() { + if c.chunkIndex >= len(c.chunkViews) { + return n, io.EOF + } + chunkView := c.chunkViews[c.chunkIndex] + c.fetchChunkToBuffer(chunkView) + c.chunkIndex++ + } + t := copy(p[n:], c.buffer[c.bufferPos:]) + c.bufferPos += t + n += t + } + return +} + +func (c *ChunkStreamReader) isBufferEmpty() bool { + return len(c.buffer) <= c.bufferPos +} + +func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { + + var totalSize int64 + for _, chunk := range c.chunkViews { + totalSize += int64(chunk.Size) + } + + var err error + switch whence { + case io.SeekStart: + case io.SeekCurrent: + offset += c.bufferOffset + int64(c.bufferPos) + case io.SeekEnd: + offset = totalSize + offset + } + if offset > totalSize { + err = io.ErrUnexpectedEOF + } + + for i, chunk := range c.chunkViews { + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + c.fetchChunkToBuffer(chunk) + c.chunkIndex = i + 1 + break + } + } + } + c.bufferPos = int(offset - c.bufferOffset) + + return offset, err + +} + +func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { + urlString, err := c.lookupFileId(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return err + } + var buffer bytes.Buffer + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return err + } + c.buffer = buffer.Bytes() + c.bufferPos = 0 + c.bufferOffset = chunkView.LogicOffset + + // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + + return nil +} + +func (c *ChunkStreamReader) Close() { + // TODO try to release and reuse buffer +} + +func VolumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go deleted file mode 100644 index 24e05e3ad..000000000 --- a/weed/filer2/tikv/tikv_store.go +++ /dev/null @@ -1,251 +0,0 @@ -// +build !386 -// +build !arm - -package tikv - -import ( - "bytes" - "context" - "crypto/md5" - "fmt" - "io" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" - - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/store/tikv" -) - -func init() { - filer2.Stores = append(filer2.Stores, &TikvStore{}) -} - -type TikvStore struct { - store kv.Storage -} - -func (store *TikvStore) GetName() string { - return "tikv" -} - -func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - pdAddr := configuration.GetString(prefix + "pdAddress") - return store.initialize(pdAddr) -} - -func (store *TikvStore) initialize(pdAddr string) (err error) { - glog.Infof("filer store tikv pd address: %s", pdAddr) - - driver := tikv.Driver{} - - store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr)) - - if err != nil { - return fmt.Errorf("open tikv %s : %v", pdAddr, err) - } - - return -} - -func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { - tx, err := store.store.Begin() - if err != nil { - return ctx, err - } - return context.WithValue(ctx, "tx", tx), nil -} -func (store *TikvStore) CommitTransaction(ctx context.Context) error { - tx, ok := ctx.Value("tx").(kv.Transaction) - if ok { - return tx.Commit(ctx) - } - return nil -} -func (store *TikvStore) RollbackTransaction(ctx context.Context) error { - tx, ok := ctx.Value("tx").(kv.Transaction) - if ok { - return tx.Rollback() - } - return nil -} - -func (store *TikvStore) getTx(ctx context.Context) kv.Transaction { - if tx, ok := ctx.Value("tx").(kv.Transaction); ok { - return tx - } - return nil -} - -func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - dir, name := entry.DirAndName() - key := genKey(dir, name) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - err = store.getTx(ctx).Set(key, value) - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) - - return nil -} - -func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - dir, name := fullpath.DirAndName() - key := genKey(dir, name) - - data, err := store.getTx(ctx).Get(ctx, key) - - if err == kv.ErrNotExist { - return nil, filer2.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) - - return entry, nil -} - -func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - dir, name := fullpath.DirAndName() - key := genKey(dir, name) - - err = store.getTx(ctx).Delete(key) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - tx := store.getTx(ctx) - - iter, err := tx.Iter(directoryPrefix, nil) - if err != nil { - return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err) - } - defer iter.Close() - for iter.Valid() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - iter.Next() - continue - } - - if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - iter.Next() - } - - return nil -} - -func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName) - - iter, err := store.getTx(ctx).Iter(lastFileStart, nil) - if err != nil { - return nil, fmt.Errorf("list %s: %v", fullpath, err) - } - defer iter.Close() - for iter.Valid() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - iter.Next() - continue - } - if fileName == startFileName && !inclusive { - iter.Next() - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), - } - - // println("list", entry.FullPath, "chunks", len(entry.Chunks)) - - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - iter.Next() - } - - return entries, err -} - -func genKey(dirPath, fileName string) (key []byte) { - key = hashToBytes(dirPath) - key = append(key, []byte(fileName)...) - return key -} - -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { - keyPrefix = hashToBytes(string(fullpath)) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix -} - -func getNameFromKey(key []byte) string { - - return string(key[md5.Size:]) - -} - -// hash directory -func hashToBytes(dir string) []byte { - h := md5.New() - io.WriteString(h, dir) - - b := h.Sum(nil) - - return b -} diff --git a/weed/filer2/tikv/tikv_store_unsupported.go b/weed/filer2/tikv/tikv_store_unsupported.go deleted file mode 100644 index daf29612e..000000000 --- a/weed/filer2/tikv/tikv_store_unsupported.go +++ /dev/null @@ -1,65 +0,0 @@ -// +build 386 arm - -package tikv - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - weed_util "github.com/chrislusf/seaweedfs/weed/util" -) - -func init() { - filer2.Stores = append(filer2.Stores, &TikvStore{}) -} - -type TikvStore struct { -} - -func (store *TikvStore) GetName() string { - return "tikv" -} - -func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) initialize(pdAddr string) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return nil, fmt.Errorf("not implemented for 32 bit computers") -} -func (store *TikvStore) CommitTransaction(ctx context.Context) error { - return fmt.Errorf("not implemented for 32 bit computers") -} -func (store *TikvStore) RollbackTransaction(ctx context.Context) error { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - return nil, fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - return nil, fmt.Errorf("not implemented for 32 bit computers") -} diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go new file mode 100644 index 000000000..9c6e5c88d --- /dev/null +++ b/weed/filer2/topics.go @@ -0,0 +1,6 @@ +package filer2 + +const ( + TopicsDir = "/topics" + SystemLogDir = TopicsDir + "/.system/log" +) |
