diff options
Diffstat (limited to 'weed/filer')
34 files changed, 862 insertions, 224 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go index 9f83da4aa..ddb339bfb 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -24,6 +24,8 @@ type Attr struct { SymlinkTarget string Md5 []byte FileSize uint64 + Rdev uint32 + Inode uint64 } func (attr Attr) IsDirectory() bool { diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go index 0a917bea9..683e83cde 100644 --- a/weed/filer/entry_codec.go +++ b/weed/filer/entry_codec.go @@ -48,6 +48,8 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { SymlinkTarget: entry.Attr.SymlinkTarget, Md5: entry.Attr.Md5, FileSize: entry.Attr.FileSize, + Rdev: entry.Attr.Rdev, + Inode: entry.Attr.Inode, } } @@ -74,6 +76,8 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t.SymlinkTarget = attr.SymlinkTarget t.Md5 = attr.Md5 t.FileSize = attr.FileSize + t.Rdev = attr.Rdev + t.Inode = attr.Inode return t } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index b6a64b30d..a1f84b38e 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -8,6 +8,7 @@ import ( "math" "net/url" "strings" + "sync" "time" "github.com/golang/protobuf/proto" @@ -21,6 +22,12 @@ const ( ManifestBatch = 10000 ) +var bytesBufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { for _, chunk := range chunks { if chunk.IsChunkManifest { @@ -61,12 +68,12 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun manifestChunks = append(manifestChunks, chunk) // recursive - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) + dataChunks, manifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { return chunks, nil, subErr } - dataChunks = append(dataChunks, dchunks...) - manifestChunks = append(manifestChunks, mchunks...) + dataChunks = append(dataChunks, dataChunks...) + manifestChunks = append(manifestChunks, manifestChunks...) } return } @@ -77,12 +84,15 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // IsChunkManifest - data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer) + bytesBuffer.Reset() + defer bytesBufferPool.Put(bytesBuffer) + err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) if err != nil { return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) } m := &filer_pb.FileChunkManifest{} - if err := proto.Unmarshal(data, m); err != nil { + if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil { return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) } @@ -92,38 +102,43 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // TODO fetch from cache for weed mount? -func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { +func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err + return err + } + err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0) + if err != nil { + return err } - return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0) + return nil } -func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) { +func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err + return 0, err } - return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size) + return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } -func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { +func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { - var err error var shouldRetry bool - receivedData := make([]byte, 0, size) for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - receivedData = receivedData[:0] + n = 0 if strings.Contains(urlString, "%") { urlString = url.PathEscape(urlString) } - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { - receivedData = append(receivedData, data...) + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + if n < len(buffer) { + x := copy(buffer[n:], data) + n += x + } }) if !shouldRetry { break @@ -142,7 +157,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool } } - return receivedData, err + return n, err } diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index d18d06f2c..fd9694b38 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -23,7 +23,13 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { } func FileSize(entry *filer_pb.Entry) (size uint64) { - return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) + fileSize := entry.Attributes.FileSize + if entry.RemoteEntry != nil { + if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime { + fileSize = maxUint64(fileSize, uint64(entry.RemoteEntry.RemoteSize)) + } + } + return maxUint64(TotalSize(entry.Chunks), fileSize) } func ETag(entry *filer_pb.Entry) (etag string) { diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 0f34adb4d..836a0e447 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -49,7 +49,7 @@ type Filer struct { UniqueFileId uint32 } -func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, +func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.FilerType, filerHost, dataCenter, masters), @@ -151,7 +151,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error { return f.Store.RollbackTransaction(ctx) } -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error { +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool) error { if string(entry.FullPath) == "/" { return nil @@ -169,9 +169,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr if oldEntry == nil { - dirParts := strings.Split(string(entry.FullPath), "/") - if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { - return err + if !skipCreateParentDir { + dirParts := strings.Split(string(entry.FullPath), "/") + if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { + return err + } } glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name()) diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index bda69b15f..c774f5d27 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -9,8 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -type HardLinkId []byte - const ( MsgFailDelNonEmptyFolder = "fail to delete non-empty folder" ) @@ -127,7 +125,11 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) - if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { + if !entry.IsDirectory() && !shouldDeleteChunks { + if storeDeletionErr := f.Store.DeleteOneEntrySkipHardlink(ctx, entry.FullPath); storeDeletionErr != nil { + return fmt.Errorf("filer store delete skip hardlink: %v", storeDeletionErr) + } + } else if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { return fmt.Errorf("filer store delete: %v", storeDeletionErr) } if !entry.IsDirectory() { diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index e0191d7f1..e73f94151 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -1,6 +1,7 @@ package filer import ( + "math" "strings" "time" @@ -129,6 +130,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { } } +func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) { + for _, chunk := range chunks { + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + } +} + func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { if oldEntry == nil { @@ -141,14 +148,36 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { var toDelete []*filer_pb.FileChunk newChunkIds := make(map[string]bool) - for _, newChunk := range newEntry.Chunks { + newDataChunks, newManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), + newEntry.Chunks, 0, math.MaxInt64) + if err != nil { + glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s", + newEntry.Chunks, oldEntry.Chunks) + return + } + for _, newChunk := range newDataChunks { + newChunkIds[newChunk.GetFileIdString()] = true + } + for _, newChunk := range newManifestChunks { newChunkIds[newChunk.GetFileIdString()] = true } - for _, oldChunk := range oldEntry.Chunks { + oldDataChunks, oldManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), + oldEntry.Chunks, 0, math.MaxInt64) + if err != nil { + glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", + newEntry.Chunks, oldEntry.Chunks) + return + } + for _, oldChunk := range oldDataChunks { + if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { + toDelete = append(toDelete, oldChunk) + } + } + for _, oldChunk := range oldManifestChunks { if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { toDelete = append(toDelete, oldChunk) } } - f.DeleteChunks(toDelete) + f.DeleteChunksNotRecursive(toDelete) } diff --git a/weed/filer/filer_hardlink.go b/weed/filer/filer_hardlink.go new file mode 100644 index 000000000..7a91602fd --- /dev/null +++ b/weed/filer/filer_hardlink.go @@ -0,0 +1,16 @@ +package filer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + HARD_LINK_MARKER = '\x01' +) + +type HardLinkId []byte // 16 bytes + 1 marker byte + +func NewHardLinkId() HardLinkId { + bytes := append(util.RandomBytes(16), HARD_LINK_MARKER) + return bytes +} diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index e30ef4e54..25b99d0f7 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -43,7 +43,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) // update the entry - err = f.CreateEntry(context.Background(), entry, false, false, nil) + err = f.CreateEntry(context.Background(), entry, false, false, nil, false) return err } diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index 720e019f4..3b290deca 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -22,12 +22,12 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { } } if f.DirBucketsPath == event.Directory { - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(event) { if message.NewEntry.IsDirectory { f.Store.OnBucketCreation(message.NewEntry.Name) } } - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(event) { if message.OldEntry.IsDirectory { f.Store.OnBucketDeletion(message.OldEntry.Name) } diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go index 316c76a0c..ae2f5fee7 100644 --- a/weed/filer/filerstore_hardlink.go +++ b/weed/filer/filerstore_hardlink.go @@ -9,16 +9,20 @@ import ( ) func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error { - if len(entry.HardLinkId) == 0 { + + if entry.IsDirectory() { return nil } - // handle hard links - if err := fsw.setHardLink(ctx, entry); err != nil { - return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err) + + if len(entry.HardLinkId) > 0 { + // handle hard links + if err := fsw.setHardLink(ctx, entry); err != nil { + return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err) + } } // check what is existing entry - glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath) + // glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath) actualStore := fsw.getActualStore(entry.FullPath) existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath) if err != nil && err != filer_pb.ErrNotFound { @@ -46,6 +50,8 @@ func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) err return encodeErr } + glog.V(4).Infof("setHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter) + return fsw.KvPut(ctx, key, newBlob) } @@ -55,7 +61,6 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr } key := entry.HardLinkId - glog.V(4).Infof("maybeReadHardLink KvGet %v", key) value, err := fsw.KvGet(ctx, key) if err != nil { glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) @@ -67,6 +72,8 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr return err } + glog.V(4).Infof("maybeReadHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter) + return nil } diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index ca531dc3a..7f5d9729d 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -23,6 +23,7 @@ type VirtualFilerStore interface { FilerStore DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteOneEntry(ctx context.Context, entry *Entry) error + DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) error AddPathSpecificStore(path string, storeId string, store FilerStore) OnBucketCreation(bucket string) OnBucketDeletion(bucket string) @@ -127,7 +128,7 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err return err } - glog.V(4).Infof("InsertEntry %s", entry.FullPath) + // glog.V(4).Infof("InsertEntry %s", entry.FullPath) return actualStore.InsertEntry(ctx, entry) } @@ -148,7 +149,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err return err } - glog.V(4).Infof("UpdateEntry %s", entry.FullPath) + // glog.V(4).Infof("UpdateEntry %s", entry.FullPath) return actualStore.UpdateEntry(ctx, entry) } @@ -192,7 +193,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) } } - glog.V(4).Infof("DeleteEntry %s", fp) + // glog.V(4).Infof("DeleteEntry %s", fp) return actualStore.DeleteEntry(ctx, fp) } @@ -212,10 +213,22 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry } } - glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) + // glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) return actualStore.DeleteEntry(ctx, existingEntry.FullPath) } +func (fsw *FilerStoreWrapper) DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) (err error) { + actualStore := fsw.getActualStore(fullpath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("DeleteOneEntrySkipHardlink %s", fullpath) + return actualStore.DeleteEntry(ctx, fullpath) +} + func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { actualStore := fsw.getActualStore(fp + "/") stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() @@ -224,7 +237,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) }() - glog.V(4).Infof("DeleteFolderChildren %s", fp) + // glog.V(4).Infof("DeleteFolderChildren %s", fp) return actualStore.DeleteFolderChildren(ctx, fp) } @@ -236,7 +249,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) }() - glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) + // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) @@ -254,7 +267,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, if limit > math.MaxInt32-1 { limit = math.MaxInt32 - 1 } - glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) + // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) adjustedEntryFunc := func(entry *Entry) bool { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) @@ -285,8 +298,10 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u count := int64(0) for count < limit && len(notPrefixed) > 0 { + var isLastItemHasPrefix bool for _, entry := range notPrefixed { if strings.HasPrefix(entry.Name(), prefix) { + isLastItemHasPrefix = true count++ if !eachEntryFunc(entry) { return @@ -294,9 +309,11 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u if count >= limit { break } + } else { + isLastItemHasPrefix = false } } - if count < limit { + if count < limit && isLastItemHasPrefix && len(notPrefixed) == int(limit) { notPrefixed = notPrefixed[:0] lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { notPrefixed = append(notPrefixed, entry) @@ -305,6 +322,8 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u if err != nil { return } + } else { + break } } return diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go index 2476e063c..2496add4b 100644 --- a/weed/filer/leveldb/leveldb_store_test.go +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -13,8 +13,7 @@ import ( func TestCreateAndFind(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -32,7 +31,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -67,8 +66,7 @@ func TestCreateAndFind(t *testing.T) { func TestEmptyRoot(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -90,8 +88,7 @@ func TestEmptyRoot(t *testing.T) { func BenchmarkInsertEntry(b *testing.B) { testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_bench") - defer os.RemoveAll(dir) + dir := b.TempDir() store := &LevelDBStore{} store.initialize(dir) testFiler.SetStore(store) diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go index 93c622fd9..f04ffc049 100644 --- a/weed/filer/leveldb2/leveldb2_store_test.go +++ b/weed/filer/leveldb2/leveldb2_store_test.go @@ -2,7 +2,6 @@ package leveldb import ( "context" - "os" "testing" "github.com/chrislusf/seaweedfs/weed/filer" @@ -11,8 +10,7 @@ import ( func TestCreateAndFind(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) testFiler.SetStore(store) @@ -30,7 +28,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -65,8 +63,7 @@ func TestCreateAndFind(t *testing.T) { func TestEmptyRoot(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) testFiler.SetStore(store) diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index 86e2b584b..e448f0093 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -72,8 +72,8 @@ func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) { } if name != DEFAULT { opts = &opt.Options{ - BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB + BlockCacheCapacity: 16 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 8 * 1024 * 1024, // default value is 4MiB Filter: bloom, } } diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go index a5e97cf10..dcd390a0c 100644 --- a/weed/filer/leveldb3/leveldb3_store_test.go +++ b/weed/filer/leveldb3/leveldb3_store_test.go @@ -2,7 +2,6 @@ package leveldb import ( "context" - "os" "testing" "github.com/chrislusf/seaweedfs/weed/filer" @@ -11,8 +10,7 @@ import ( func TestCreateAndFind(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) testFiler.SetStore(store) @@ -30,7 +28,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -65,8 +63,7 @@ func TestCreateAndFind(t *testing.T) { func TestEmptyRoot(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) testFiler.SetStore(store) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 1e8b89ad5..83c8a945d 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -76,9 +76,6 @@ func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (no } } else { if _, found := ma.peerStatues[address]; found { - ma.peerStatues[address] -= 1 - } - if ma.peerStatues[address] <= 0 { delete(ma.peerStatues, address) } } diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 6935be1ab..c12354ad6 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -159,7 +159,7 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa dir, name := fullpath.DirAndName() where := bson.M{"directory": dir, "name": name} - _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where) + _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) } @@ -199,9 +199,9 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti for cur.Next(ctx) { var data Model - err := cur.Decode(&data) - if err != nil && err != mongo.ErrNoDocuments { - return lastFileName, err + err = cur.Decode(&data) + if err != nil { + break } entry := &filer.Entry{ diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go index a1f54455a..e50480150 100644 --- a/weed/filer/mysql2/mysql2_store.go +++ b/weed/filer/mysql2/mysql2_store.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -82,7 +83,7 @@ func (store *MysqlStore2) initialize(createTable, upsertQuery string, enableUpse return fmt.Errorf("connect to %s error:%v", sqlUrl, err) } - if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil { + if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil && !strings.Contains(err.Error(), "table already exist") { return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err) } diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 3b6a69fb6..19d98e99e 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -62,6 +62,7 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte }, Content: content, }, + SkipCheckParentDirectory: true, }) } else if err == nil { entry := resp.Entry diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index b1c15152f..7d9997761 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -12,21 +12,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/golang/groupcache/singleflight" ) type ChunkReadAt struct { - masterClient *wdclient.MasterClient - chunkViews []*ChunkView - lookupFileId wdclient.LookupFileIdFunctionType - readerLock sync.Mutex - fileSize int64 - - fetchGroup singleflight.Group - chunkCache chunk_cache.ChunkCache - lastChunkFileId string - lastChunkData []byte - readerPattern *ReaderPattern + masterClient *wdclient.MasterClient + chunkViews []*ChunkView + readerLock sync.Mutex + fileSize int64 + readerCache *ReaderCache + readerPattern *ReaderPattern + lastChunkFid string } var _ = io.ReaderAt(&ChunkReadAt{}) @@ -90,16 +85,14 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun return &ChunkReadAt{ chunkViews: chunkViews, - lookupFileId: lookupFn, - chunkCache: chunkCache, fileSize: fileSize, + readerCache: newReaderCache(32, chunkCache, lookupFn), readerPattern: NewReaderPattern(), } } func (c *ChunkReadAt) Close() error { - c.lastChunkData = nil - c.lastChunkFileId = "" + c.readerCache.destroy() return nil } @@ -117,15 +110,13 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { startOffset, remaining := offset, int64(len(p)) - var nextChunk *ChunkView + var nextChunks []*ChunkView for i, chunk := range c.chunkViews { if remaining <= 0 { break } if i+1 < len(c.chunkViews) { - nextChunk = c.chunkViews[i+1] - } else { - nextChunk = nil + nextChunks = c.chunkViews[i+1:] } if startOffset < chunk.LogicOffset { gap := int(chunk.LogicOffset - startOffset) @@ -142,16 +133,13 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { continue } // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - var buffer []byte bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset - bufferLength := chunkStop - chunkStart - buffer, err = c.readChunkSlice(chunk, nextChunk, uint64(bufferOffset), uint64(bufferLength)) + copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return + return copied, err } - copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer) n += copied startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) } @@ -173,104 +161,25 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) { +func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) { - var chunkSlice []byte - if chunkView.LogicOffset == 0 { - chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) - } - if len(chunkSlice) > 0 { - return chunkSlice, nil - } - if c.lookupFileId == nil { - return nil, nil - } if c.readerPattern.IsRandomMode() { - return c.doFetchRangeChunkData(chunkView, offset, length) - } - chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews) - if err != nil { - return nil, err - } - wanted := min(int64(length), int64(len(chunkData))-int64(offset)) - return chunkData[offset : int64(offset)+wanted], nil -} - -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) { - - if c.lastChunkFileId == chunkView.FileId { - return c.lastChunkData, nil - } - - v, doErr := c.readOneWholeChunk(chunkView) - - if doErr != nil { - return nil, doErr + return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) } - chunkData = v.([]byte) - - c.lastChunkData = chunkData - c.lastChunkFileId = chunkView.FileId - - for _, nextChunkView := range nextChunkViews { - if c.chunkCache != nil && nextChunkView != nil { - go c.readOneWholeChunk(nextChunkView) + n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0) + if c.lastChunkFid != chunkView.FileId { + if chunkView.Offset == 0 { // start of a new chunk + if c.lastChunkFid != "" { + c.readerCache.UnCache(c.lastChunkFid) + c.readerCache.MaybeCache(nextChunkViews) + } else { + if len(nextChunkViews) >= 1 { + c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning + } + } } } - + c.lastChunkFid = chunkView.FileId return } - -func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) { - - var err error - - return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) { - - glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) - - var data []byte - if chunkView.LogicOffset == 0 { - data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) - } - if data != nil { - glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) - } else { - var err error - data, err = c.doFetchFullChunkData(chunkView) - if err != nil { - return data, err - } - if chunkView.LogicOffset == 0 { - // only cache the first chunk - c.chunkCache.SetChunk(chunkView.FileId, data) - } - } - return data, err - }) -} - -func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) { - - glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) - - data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) - - glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) - - return data, err - -} - -func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) { - - glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) - - data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length)) - - glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) - - return data, err - -} diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go index 411d7eb3b..d9afb460c 100644 --- a/weed/filer/reader_at_test.go +++ b/weed/filer/reader_at_test.go @@ -21,8 +21,12 @@ func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { return data } -func (m *mockChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte { - return m.GetChunk(fileId, length) +func (m *mockChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) { + x, _ := strconv.Atoi(fileId) + for i := 0; i < len(data); i++ { + data[i] = byte(x) + } + return len(data), nil } func (m *mockChunkCache) SetChunk(fileId string, data []byte) { @@ -65,10 +69,9 @@ func TestReaderAt(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, readerLock: sync.Mutex{}, fileSize: 10, - chunkCache: &mockChunkCache{}, + readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } @@ -81,7 +84,7 @@ func TestReaderAt(t *testing.T) { func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) { data := make([]byte, size) - n, err := readerAt.ReadAt(data, offset) + n, err := readerAt.doReadAt(data, offset) for _, d := range data { fmt.Printf("%x", d) @@ -116,10 +119,9 @@ func TestReaderAt0(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, readerLock: sync.Mutex{}, fileSize: 10, - chunkCache: &mockChunkCache{}, + readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } @@ -145,10 +147,9 @@ func TestReaderAt1(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, readerLock: sync.Mutex{}, fileSize: 20, - chunkCache: &mockChunkCache{}, + readerCache: newReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go new file mode 100644 index 000000000..bce97cc49 --- /dev/null +++ b/weed/filer/reader_cache.go @@ -0,0 +1,192 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "sync" + "time" +) + +type ReaderCache struct { + chunkCache chunk_cache.ChunkCache + lookupFileIdFn wdclient.LookupFileIdFunctionType + sync.Mutex + downloaders map[string]*SingleChunkCacher + limit int +} + +type SingleChunkCacher struct { + sync.RWMutex + parent *ReaderCache + chunkFileId string + data []byte + err error + cipherKey []byte + isGzipped bool + chunkSize int + shouldCache bool + wg sync.WaitGroup + completedTime time.Time +} + +func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { + return &ReaderCache{ + limit: limit, + chunkCache: chunkCache, + lookupFileIdFn: lookupFileIdFn, + downloaders: make(map[string]*SingleChunkCacher), + } +} + +func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { + if rc.lookupFileIdFn == nil { + return + } + + rc.Lock() + defer rc.Unlock() + + for _, chunkView := range chunkViews { + if _, found := rc.downloaders[chunkView.FileId]; found { + continue + } + + if len(rc.downloaders) >= rc.limit { + // if still no slots, return + return + } + + // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset) + // cache this chunk if not yet + cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false) + cacher.wg.Add(1) + go cacher.startCaching() + cacher.wg.Wait() + rc.downloaders[chunkView.FileId] = cacher + + } + + return +} + +func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { + rc.Lock() + defer rc.Unlock() + if cacher, found := rc.downloaders[fileId]; found { + return cacher.readChunkAt(buffer, offset) + } + if shouldCache || rc.lookupFileIdFn == nil { + n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) + if n > 0 { + return n, err + } + } + + if len(rc.downloaders) >= rc.limit { + oldestFid, oldestTime := "", time.Now() + for fid, downloader := range rc.downloaders { + if !downloader.completedTime.IsZero() { + if downloader.completedTime.Before(oldestTime) { + oldestFid, oldestTime = fid, downloader.completedTime + } + } + } + if oldestFid != "" { + oldDownloader := rc.downloaders[oldestFid] + delete(rc.downloaders, oldestFid) + oldDownloader.destroy() + } + } + + // glog.V(4).Infof("cache1 %s", fileId) + + cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache) + cacher.wg.Add(1) + go cacher.startCaching() + cacher.wg.Wait() + rc.downloaders[fileId] = cacher + + return cacher.readChunkAt(buffer, offset) +} + +func (rc *ReaderCache) UnCache(fileId string) { + rc.Lock() + defer rc.Unlock() + // glog.V(4).Infof("uncache %s", fileId) + if downloader, found := rc.downloaders[fileId]; found { + downloader.destroy() + delete(rc.downloaders, fileId) + } +} + +func (rc *ReaderCache) destroy() { + rc.Lock() + defer rc.Unlock() + + for _, downloader := range rc.downloaders { + downloader.destroy() + } + +} + +func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher { + t := &SingleChunkCacher{ + parent: parent, + chunkFileId: fileId, + cipherKey: cipherKey, + isGzipped: isGzipped, + chunkSize: chunkSize, + shouldCache: shouldCache, + } + return t +} + +func (s *SingleChunkCacher) startCaching() { + s.Lock() + defer s.Unlock() + + s.wg.Done() // means this has been started + + urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId) + if err != nil { + s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) + return + } + + s.data = mem.Allocate(s.chunkSize) + + _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) + if s.err != nil { + mem.Free(s.data) + s.data = nil + return + } + + s.completedTime = time.Now() + if s.shouldCache { + s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + } + + return +} + +func (s *SingleChunkCacher) destroy() { + if s.data != nil { + mem.Free(s.data) + s.data = nil + } +} + +func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { + s.RLock() + defer s.RUnlock() + + if s.err != nil { + return 0, s.err + } + + return copy(buf, s.data[offset:]), nil + +} diff --git a/weed/filer/redis_lua/redis_cluster_store.go b/weed/filer/redis_lua/redis_cluster_store.go new file mode 100644 index 000000000..b68d1092c --- /dev/null +++ b/weed/filer/redis_lua/redis_cluster_store.go @@ -0,0 +1,44 @@ +package redis_lua + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisLuaClusterStore{}) +} + +type RedisLuaClusterStore struct { + UniversalRedisLuaStore +} + +func (store *RedisLuaClusterStore) GetName() string { + return "redis_lua_cluster" +} + +func (store *RedisLuaClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", false) + configuration.SetDefault(prefix+"routeByLatency", false) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), + ) +} + +func (store *RedisLuaClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + store.loadSuperLargeDirectories(superLargeDirectories) + return +} diff --git a/weed/filer/redis_lua/redis_sentinel_store.go b/weed/filer/redis_lua/redis_sentinel_store.go new file mode 100644 index 000000000..5530c098e --- /dev/null +++ b/weed/filer/redis_lua/redis_sentinel_store.go @@ -0,0 +1,45 @@ +package redis_lua + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" + "time" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisLuaSentinelStore{}) +} + +type RedisLuaSentinelStore struct { + UniversalRedisLuaStore +} + +func (store *RedisLuaSentinelStore) GetName() string { + return "redis_lua_sentinel" +} + +func (store *RedisLuaSentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"masterName"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) { + store.Client = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: masterName, + SentinelAddrs: addresses, + Username: username, + Password: password, + DB: database, + MinRetryBackoff: time.Millisecond * 100, + MaxRetryBackoff: time.Minute * 1, + ReadTimeout: time.Second * 30, + WriteTimeout: time.Second * 5, + }) + return +} diff --git a/weed/filer/redis_lua/redis_store.go b/weed/filer/redis_lua/redis_store.go new file mode 100644 index 000000000..a7d11c73c --- /dev/null +++ b/weed/filer/redis_lua/redis_store.go @@ -0,0 +1,38 @@ +package redis_lua + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisLuaStore{}) +} + +type RedisLuaStore struct { + UniversalRedisLuaStore +} + +func (store *RedisLuaStore) GetName() string { + return "redis_lua" +} + +func (store *RedisLuaStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), + ) +} + +func (store *RedisLuaStore) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + store.loadSuperLargeDirectories(superLargeDirectories) + return +} diff --git a/weed/filer/redis_lua/stored_procedure/delete_entry.lua b/weed/filer/redis_lua/stored_procedure/delete_entry.lua new file mode 100644 index 000000000..445337c77 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/delete_entry.lua @@ -0,0 +1,19 @@ +-- KEYS[1]: full path of entry +local fullpath = KEYS[1] +-- KEYS[2]: full path of entry +local fullpath_list_key = KEYS[2] +-- KEYS[3]: dir of the entry +local dir_list_key = KEYS[3] + +-- ARGV[1]: isSuperLargeDirectory +local isSuperLargeDirectory = ARGV[1] == "1" +-- ARGV[2]: name of the entry +local name = ARGV[2] + +redis.call("DEL", fullpath, fullpath_list_key) + +if not isSuperLargeDirectory and name ~= "" then + redis.call("ZREM", dir_list_key, name) +end + +return 0
\ No newline at end of file diff --git a/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua new file mode 100644 index 000000000..77e4839f9 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua @@ -0,0 +1,15 @@ +-- KEYS[1]: full path of entry +local fullpath = KEYS[1] + +if fullpath ~= "" and string.sub(fullpath, -1) == "/" then + fullpath = string.sub(fullpath, 0, -2) +end + +local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1") + +for _, name in ipairs(files) do + local file_path = fullpath .. "/" .. name + redis.call("DEL", file_path, file_path .. "\0") +end + +return 0
\ No newline at end of file diff --git a/weed/filer/redis_lua/stored_procedure/init.go b/weed/filer/redis_lua/stored_procedure/init.go new file mode 100644 index 000000000..1412ceba2 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/init.go @@ -0,0 +1,24 @@ +package stored_procedure + +import ( + _ "embed" + "github.com/go-redis/redis/v8" +) + +func init() { + InsertEntryScript = redis.NewScript(insertEntry) + DeleteEntryScript = redis.NewScript(deleteEntry) + DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren) +} + +//go:embed insert_entry.lua +var insertEntry string +var InsertEntryScript *redis.Script + +//go:embed delete_entry.lua +var deleteEntry string +var DeleteEntryScript *redis.Script + +//go:embed delete_folder_children.lua +var deleteFolderChildren string +var DeleteFolderChildrenScript *redis.Script diff --git a/weed/filer/redis_lua/stored_procedure/insert_entry.lua b/weed/filer/redis_lua/stored_procedure/insert_entry.lua new file mode 100644 index 000000000..8deef3446 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/insert_entry.lua @@ -0,0 +1,27 @@ +-- KEYS[1]: full path of entry +local full_path = KEYS[1] +-- KEYS[2]: dir of the entry +local dir_list_key = KEYS[2] + +-- ARGV[1]: content of the entry +local entry = ARGV[1] +-- ARGV[2]: TTL of the entry +local ttlSec = tonumber(ARGV[2]) +-- ARGV[3]: isSuperLargeDirectory +local isSuperLargeDirectory = ARGV[3] == "1" +-- ARGV[4]: zscore of the entry in zset +local zscore = tonumber(ARGV[4]) +-- ARGV[5]: name of the entry +local name = ARGV[5] + +if ttlSec > 0 then + redis.call("SET", full_path, entry, "EX", ttlSec) +else + redis.call("SET", full_path, entry) +end + +if not isSuperLargeDirectory and name ~= "" then + redis.call("ZADD", dir_list_key, "NX", zscore, name) +end + +return 0
\ No newline at end of file diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go new file mode 100644 index 000000000..9674ac03f --- /dev/null +++ b/weed/filer/redis_lua/universal_redis_store.go @@ -0,0 +1,191 @@ +package redis_lua + +import ( + "context" + "fmt" + "time" + + "github.com/go-redis/redis/v8" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/redis_lua/stored_procedure" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedisLuaStore struct { + Client redis.UniversalClient + superLargeDirectoryHash map[string]bool +} + +func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) { + _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) { + // set directory hash + store.superLargeDirectoryHash = make(map[string]bool) + for _, dir := range superLargeDirectories { + store.superLargeDirectoryHash[dir] = true + } +} + +func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + + dir, name := entry.FullPath.DirAndName() + + err = stored_procedure.InsertEntryScript.Run(ctx, store.Client, + []string{string(entry.FullPath), genDirectoryListKey(dir)}, + value, entry.TtlSec, + store.isSuperLargeDirectory(dir), 0, name).Err() + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + data, err := store.Client.Get(ctx, string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + dir, name := fullpath.DirAndName() + + err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client, + []string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)}, + store.isSuperLargeDirectory(dir), name).Err() + + if err != nil { + return fmt.Errorf("DeleteEntry %s : %v", fullpath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + if store.isSuperLargeDirectory(string(fullpath)) { + return nil + } + + err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client, + []string{string(fullpath)}).Err() + + if err != nil { + return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + dirListKey := genDirectoryListKey(string(dirPath)) + + min := "-" + if startFileName != "" { + if includeStartFile { + min = "[" + startFileName + } else { + min = "(" + startFileName + } + } + + members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{ + Min: min, + Max: "+", + Offset: 0, + Count: limit, + }).Result() + if err != nil { + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) + } + + // fetch entry meta + for _, fileName := range members { + path := util.NewFullPath(string(dirPath), fileName) + entry, err := store.FindEntry(ctx, path) + lastFileName = fileName + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + if err == filer_pb.ErrNotFound { + continue + } + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.DeleteEntry(ctx, path) + continue + } + } + if !eachEntryFunc(entry) { + break + } + } + } + + return lastFileName, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedisLuaStore) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/redis_lua/universal_redis_store_kv.go b/weed/filer/redis_lua/universal_redis_store_kv.go new file mode 100644 index 000000000..3df980b66 --- /dev/null +++ b/weed/filer/redis_lua/universal_redis_store_kv.go @@ -0,0 +1,42 @@ +package redis_lua + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/go-redis/redis/v8" +) + +func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + _, err = store.Client.Set(ctx, string(key), value, 0).Result() + + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + data, err := store.Client.Get(ctx, string(key)).Result() + + if err == redis.Nil { + return nil, filer.ErrKvNotFound + } + + return []byte(data), err +} + +func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) { + + _, err = store.Client.Del(ctx, string(key)).Result() + + if err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go index fbf8b3112..faabcd341 100644 --- a/weed/filer/rocksdb/rocksdb_store_test.go +++ b/weed/filer/rocksdb/rocksdb_store_test.go @@ -16,8 +16,7 @@ import ( func TestCreateAndFind(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -35,7 +34,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -70,8 +69,7 @@ func TestCreateAndFind(t *testing.T) { func TestEmptyRoot(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -93,8 +91,7 @@ func TestEmptyRoot(t *testing.T) { func BenchmarkInsertEntry(b *testing.B) { testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := os.MkdirTemp("", "seaweedfs_filer_bench") - defer os.RemoveAll(dir) + dir := b.TempDir() store := &RocksDBStore{} store.initialize(dir) testFiler.SetStore(store) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index e5163f2d9..36278f0b1 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -62,7 +62,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) + glog.V(4).Infof("start to stream content for chunks: %+v", chunks) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -104,10 +104,12 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ } stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() } - glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) - err := writeZero(writer, remaining) - if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + if remaining > 0 { + glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) + err := writeZero(writer, remaining) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + } } return nil @@ -133,30 +135,30 @@ func writeZero(w io.Writer, size int64) (err error) { return } -func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { - - buffer := bytes.Buffer{} +func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error { lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { return masterClient.LookupFileId(fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer))) + + idx := 0 for _, chunkView := range chunkViews { urlStrings, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return nil, err + return err } - data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset) if err != nil { - return nil, err + return err } - buffer.Write(data) + idx += n } - return buffer.Bytes(), nil + return nil } // ---------------- ChunkStreamReader ---------------------------------- |
