aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/entry.go2
-rw-r--r--weed/filer/entry_codec.go4
-rw-r--r--weed/filer/filechunk_manifest.go51
-rw-r--r--weed/filer/filechunks.go8
-rw-r--r--weed/filer/filer.go12
-rw-r--r--weed/filer/filer_delete_entry.go8
-rw-r--r--weed/filer/filer_deletion.go35
-rw-r--r--weed/filer/filer_hardlink.go16
-rw-r--r--weed/filer/filer_notify_append.go2
-rw-r--r--weed/filer/filer_on_meta_event.go4
-rw-r--r--weed/filer/filerstore_hardlink.go19
-rw-r--r--weed/filer/filerstore_wrapper.go35
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go11
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go9
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go4
-rw-r--r--weed/filer/leveldb3/leveldb3_store_test.go9
-rw-r--r--weed/filer/meta_aggregator.go3
-rw-r--r--weed/filer/mongodb/mongodb_store.go8
-rw-r--r--weed/filer/mysql2/mysql2_store.go3
-rw-r--r--weed/filer/read_write.go1
-rw-r--r--weed/filer/reader_at.go145
-rw-r--r--weed/filer/reader_at_test.go19
-rw-r--r--weed/filer/reader_cache.go192
-rw-r--r--weed/filer/redis_lua/redis_cluster_store.go44
-rw-r--r--weed/filer/redis_lua/redis_sentinel_store.go45
-rw-r--r--weed/filer/redis_lua/redis_store.go38
-rw-r--r--weed/filer/redis_lua/stored_procedure/delete_entry.lua19
-rw-r--r--weed/filer/redis_lua/stored_procedure/delete_folder_children.lua15
-rw-r--r--weed/filer/redis_lua/stored_procedure/init.go24
-rw-r--r--weed/filer/redis_lua/stored_procedure/insert_entry.lua27
-rw-r--r--weed/filer/redis_lua/universal_redis_store.go191
-rw-r--r--weed/filer/redis_lua/universal_redis_store_kv.go42
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go11
-rw-r--r--weed/filer/stream.go30
34 files changed, 862 insertions, 224 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index 9f83da4aa..ddb339bfb 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -24,6 +24,8 @@ type Attr struct {
SymlinkTarget string
Md5 []byte
FileSize uint64
+ Rdev uint32
+ Inode uint64
}
func (attr Attr) IsDirectory() bool {
diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go
index 0a917bea9..683e83cde 100644
--- a/weed/filer/entry_codec.go
+++ b/weed/filer/entry_codec.go
@@ -48,6 +48,8 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
SymlinkTarget: entry.Attr.SymlinkTarget,
Md5: entry.Attr.Md5,
FileSize: entry.Attr.FileSize,
+ Rdev: entry.Attr.Rdev,
+ Inode: entry.Attr.Inode,
}
}
@@ -74,6 +76,8 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.SymlinkTarget = attr.SymlinkTarget
t.Md5 = attr.Md5
t.FileSize = attr.FileSize
+ t.Rdev = attr.Rdev
+ t.Inode = attr.Inode
return t
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index b6a64b30d..a1f84b38e 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -8,6 +8,7 @@ import (
"math"
"net/url"
"strings"
+ "sync"
"time"
"github.com/golang/protobuf/proto"
@@ -21,6 +22,12 @@ const (
ManifestBatch = 10000
)
+var bytesBufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
for _, chunk := range chunks {
if chunk.IsChunkManifest {
@@ -61,12 +68,12 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
manifestChunks = append(manifestChunks, chunk)
// recursive
- dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
+ dataChunks, manifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil {
return chunks, nil, subErr
}
- dataChunks = append(dataChunks, dchunks...)
- manifestChunks = append(manifestChunks, mchunks...)
+ dataChunks = append(dataChunks, dataChunks...)
+ manifestChunks = append(manifestChunks, manifestChunks...)
}
return
}
@@ -77,12 +84,15 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// IsChunkManifest
- data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
+ bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
+ bytesBuffer.Reset()
+ defer bytesBufferPool.Put(bytesBuffer)
+ err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
}
m := &filer_pb.FileChunkManifest{}
- if err := proto.Unmarshal(data, m); err != nil {
+ if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
}
@@ -92,38 +102,43 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// TODO fetch from cache for weed mount?
-func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return nil, err
+ return err
+ }
+ err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
+ if err != nil {
+ return err
}
- return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
+ return nil
}
-func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) {
+func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return nil, err
+ return 0, err
}
- return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size)
+ return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
}
-func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
+func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
- var err error
var shouldRetry bool
- receivedData := make([]byte, 0, size)
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- receivedData = receivedData[:0]
+ n = 0
if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString)
}
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
- receivedData = append(receivedData, data...)
+ shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
+ if n < len(buffer) {
+ x := copy(buffer[n:], data)
+ n += x
+ }
})
if !shouldRetry {
break
@@ -142,7 +157,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
}
}
- return receivedData, err
+ return n, err
}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index d18d06f2c..fd9694b38 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -23,7 +23,13 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
}
func FileSize(entry *filer_pb.Entry) (size uint64) {
- return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
+ fileSize := entry.Attributes.FileSize
+ if entry.RemoteEntry != nil {
+ if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime {
+ fileSize = maxUint64(fileSize, uint64(entry.RemoteEntry.RemoteSize))
+ }
+ }
+ return maxUint64(TotalSize(entry.Chunks), fileSize)
}
func ETag(entry *filer_pb.Entry) (etag string) {
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 0f34adb4d..836a0e447 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -49,7 +49,7 @@ type Filer struct {
UniqueFileId uint32
}
-func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
+func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption,
filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.FilerType, filerHost, dataCenter, masters),
@@ -151,7 +151,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.Store.RollbackTransaction(ctx)
}
-func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
+func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool) error {
if string(entry.FullPath) == "/" {
return nil
@@ -169,9 +169,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
if oldEntry == nil {
- dirParts := strings.Split(string(entry.FullPath), "/")
- if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
- return err
+ if !skipCreateParentDir {
+ dirParts := strings.Split(string(entry.FullPath), "/")
+ if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
+ return err
+ }
}
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index bda69b15f..c774f5d27 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -9,8 +9,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-type HardLinkId []byte
-
const (
MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
)
@@ -127,7 +125,11 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
- if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
+ if !entry.IsDirectory() && !shouldDeleteChunks {
+ if storeDeletionErr := f.Store.DeleteOneEntrySkipHardlink(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete skip hardlink: %v", storeDeletionErr)
+ }
+ } else if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
if !entry.IsDirectory() {
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index e0191d7f1..e73f94151 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -1,6 +1,7 @@
package filer
import (
+ "math"
"strings"
"time"
@@ -129,6 +130,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
}
}
+func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
+ for _, chunk := range chunks {
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ }
+}
+
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil {
@@ -141,14 +148,36 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
var toDelete []*filer_pb.FileChunk
newChunkIds := make(map[string]bool)
- for _, newChunk := range newEntry.Chunks {
+ newDataChunks, newManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ newEntry.Chunks, 0, math.MaxInt64)
+ if err != nil {
+ glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, newChunk := range newDataChunks {
+ newChunkIds[newChunk.GetFileIdString()] = true
+ }
+ for _, newChunk := range newManifestChunks {
newChunkIds[newChunk.GetFileIdString()] = true
}
- for _, oldChunk := range oldEntry.Chunks {
+ oldDataChunks, oldManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ oldEntry.Chunks, 0, math.MaxInt64)
+ if err != nil {
+ glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, oldChunk := range oldDataChunks {
+ if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
+ toDelete = append(toDelete, oldChunk)
+ }
+ }
+ for _, oldChunk := range oldManifestChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(toDelete)
+ f.DeleteChunksNotRecursive(toDelete)
}
diff --git a/weed/filer/filer_hardlink.go b/weed/filer/filer_hardlink.go
new file mode 100644
index 000000000..7a91602fd
--- /dev/null
+++ b/weed/filer/filer_hardlink.go
@@ -0,0 +1,16 @@
+package filer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ HARD_LINK_MARKER = '\x01'
+)
+
+type HardLinkId []byte // 16 bytes + 1 marker byte
+
+func NewHardLinkId() HardLinkId {
+ bytes := append(util.RandomBytes(16), HARD_LINK_MARKER)
+ return bytes
+}
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index e30ef4e54..25b99d0f7 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -43,7 +43,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
// update the entry
- err = f.CreateEntry(context.Background(), entry, false, false, nil)
+ err = f.CreateEntry(context.Background(), entry, false, false, nil, false)
return err
}
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index 720e019f4..3b290deca 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -22,12 +22,12 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
}
}
if f.DirBucketsPath == event.Directory {
- if message.OldEntry == nil && message.NewEntry != nil {
+ if filer_pb.IsCreate(event) {
if message.NewEntry.IsDirectory {
f.Store.OnBucketCreation(message.NewEntry.Name)
}
}
- if message.OldEntry != nil && message.NewEntry == nil {
+ if filer_pb.IsDelete(event) {
if message.OldEntry.IsDirectory {
f.Store.OnBucketDeletion(message.OldEntry.Name)
}
diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go
index 316c76a0c..ae2f5fee7 100644
--- a/weed/filer/filerstore_hardlink.go
+++ b/weed/filer/filerstore_hardlink.go
@@ -9,16 +9,20 @@ import (
)
func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
- if len(entry.HardLinkId) == 0 {
+
+ if entry.IsDirectory() {
return nil
}
- // handle hard links
- if err := fsw.setHardLink(ctx, entry); err != nil {
- return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
+
+ if len(entry.HardLinkId) > 0 {
+ // handle hard links
+ if err := fsw.setHardLink(ctx, entry); err != nil {
+ return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
+ }
}
// check what is existing entry
- glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
+ // glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
actualStore := fsw.getActualStore(entry.FullPath)
existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
if err != nil && err != filer_pb.ErrNotFound {
@@ -46,6 +50,8 @@ func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) err
return encodeErr
}
+ glog.V(4).Infof("setHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
+
return fsw.KvPut(ctx, key, newBlob)
}
@@ -55,7 +61,6 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr
}
key := entry.HardLinkId
- glog.V(4).Infof("maybeReadHardLink KvGet %v", key)
value, err := fsw.KvGet(ctx, key)
if err != nil {
glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
@@ -67,6 +72,8 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr
return err
}
+ glog.V(4).Infof("maybeReadHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
+
return nil
}
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index ca531dc3a..7f5d9729d 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -23,6 +23,7 @@ type VirtualFilerStore interface {
FilerStore
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error
+ DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) error
AddPathSpecificStore(path string, storeId string, store FilerStore)
OnBucketCreation(bucket string)
OnBucketDeletion(bucket string)
@@ -127,7 +128,7 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err
return err
}
- glog.V(4).Infof("InsertEntry %s", entry.FullPath)
+ // glog.V(4).Infof("InsertEntry %s", entry.FullPath)
return actualStore.InsertEntry(ctx, entry)
}
@@ -148,7 +149,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
return err
}
- glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
+ // glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
return actualStore.UpdateEntry(ctx, entry)
}
@@ -192,7 +193,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
}
}
- glog.V(4).Infof("DeleteEntry %s", fp)
+ // glog.V(4).Infof("DeleteEntry %s", fp)
return actualStore.DeleteEntry(ctx, fp)
}
@@ -212,10 +213,22 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry
}
}
- glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
+ // glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
+func (fsw *FilerStoreWrapper) DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) (err error) {
+ actualStore := fsw.getActualStore(fullpath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("DeleteOneEntrySkipHardlink %s", fullpath)
+ return actualStore.DeleteEntry(ctx, fullpath)
+}
+
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
actualStore := fsw.getActualStore(fp + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
@@ -224,7 +237,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
- glog.V(4).Infof("DeleteFolderChildren %s", fp)
+ // glog.V(4).Infof("DeleteFolderChildren %s", fp)
return actualStore.DeleteFolderChildren(ctx, fp)
}
@@ -236,7 +249,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
- glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
+ // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
@@ -254,7 +267,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
if limit > math.MaxInt32-1 {
limit = math.MaxInt32 - 1
}
- glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
+ // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
adjustedEntryFunc := func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
@@ -285,8 +298,10 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
count := int64(0)
for count < limit && len(notPrefixed) > 0 {
+ var isLastItemHasPrefix bool
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
+ isLastItemHasPrefix = true
count++
if !eachEntryFunc(entry) {
return
@@ -294,9 +309,11 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
if count >= limit {
break
}
+ } else {
+ isLastItemHasPrefix = false
}
}
- if count < limit {
+ if count < limit && isLastItemHasPrefix && len(notPrefixed) == int(limit) {
notPrefixed = notPrefixed[:0]
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
notPrefixed = append(notPrefixed, entry)
@@ -305,6 +322,8 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
if err != nil {
return
}
+ } else {
+ break
}
}
return
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index 2476e063c..2496add4b 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -13,8 +13,7 @@ import (
func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -32,7 +31,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -67,8 +66,7 @@ func TestCreateAndFind(t *testing.T) {
func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -90,8 +88,7 @@ func TestEmptyRoot(t *testing.T) {
func BenchmarkInsertEntry(b *testing.B) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_bench")
- defer os.RemoveAll(dir)
+ dir := b.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
index 93c622fd9..f04ffc049 100644
--- a/weed/filer/leveldb2/leveldb2_store_test.go
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -2,7 +2,6 @@ package leveldb
import (
"context"
- "os"
"testing"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -11,8 +10,7 @@ import (
func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &LevelDB2Store{}
store.initialize(dir, 2)
testFiler.SetStore(store)
@@ -30,7 +28,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -65,8 +63,7 @@ func TestCreateAndFind(t *testing.T) {
func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &LevelDB2Store{}
store.initialize(dir, 2)
testFiler.SetStore(store)
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index 86e2b584b..e448f0093 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -72,8 +72,8 @@ func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
}
if name != DEFAULT {
opts = &opt.Options{
- BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
+ BlockCacheCapacity: 16 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 8 * 1024 * 1024, // default value is 4MiB
Filter: bloom,
}
}
diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go
index a5e97cf10..dcd390a0c 100644
--- a/weed/filer/leveldb3/leveldb3_store_test.go
+++ b/weed/filer/leveldb3/leveldb3_store_test.go
@@ -2,7 +2,6 @@ package leveldb
import (
"context"
- "os"
"testing"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -11,8 +10,7 @@ import (
func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &LevelDB3Store{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -30,7 +28,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -65,8 +63,7 @@ func TestCreateAndFind(t *testing.T) {
func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &LevelDB3Store{}
store.initialize(dir)
testFiler.SetStore(store)
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 1e8b89ad5..83c8a945d 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -76,9 +76,6 @@ func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (no
}
} else {
if _, found := ma.peerStatues[address]; found {
- ma.peerStatues[address] -= 1
- }
- if ma.peerStatues[address] <= 0 {
delete(ma.peerStatues, address)
}
}
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index 6935be1ab..c12354ad6 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -159,7 +159,7 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa
dir, name := fullpath.DirAndName()
where := bson.M{"directory": dir, "name": name}
- _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
+ _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@@ -199,9 +199,9 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti
for cur.Next(ctx) {
var data Model
- err := cur.Decode(&data)
- if err != nil && err != mongo.ErrNoDocuments {
- return lastFileName, err
+ err = cur.Decode(&data)
+ if err != nil {
+ break
}
entry := &filer.Entry{
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
index a1f54455a..e50480150 100644
--- a/weed/filer/mysql2/mysql2_store.go
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -82,7 +83,7 @@ func (store *MysqlStore2) initialize(createTable, upsertQuery string, enableUpse
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
}
- if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
+ if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil && !strings.Contains(err.Error(), "table already exist") {
return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
}
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index 3b6a69fb6..19d98e99e 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -62,6 +62,7 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte
},
Content: content,
},
+ SkipCheckParentDirectory: true,
})
} else if err == nil {
entry := resp.Entry
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index b1c15152f..7d9997761 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -12,21 +12,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/golang/groupcache/singleflight"
)
type ChunkReadAt struct {
- masterClient *wdclient.MasterClient
- chunkViews []*ChunkView
- lookupFileId wdclient.LookupFileIdFunctionType
- readerLock sync.Mutex
- fileSize int64
-
- fetchGroup singleflight.Group
- chunkCache chunk_cache.ChunkCache
- lastChunkFileId string
- lastChunkData []byte
- readerPattern *ReaderPattern
+ masterClient *wdclient.MasterClient
+ chunkViews []*ChunkView
+ readerLock sync.Mutex
+ fileSize int64
+ readerCache *ReaderCache
+ readerPattern *ReaderPattern
+ lastChunkFid string
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -90,16 +85,14 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun
return &ChunkReadAt{
chunkViews: chunkViews,
- lookupFileId: lookupFn,
- chunkCache: chunkCache,
fileSize: fileSize,
+ readerCache: newReaderCache(32, chunkCache, lookupFn),
readerPattern: NewReaderPattern(),
}
}
func (c *ChunkReadAt) Close() error {
- c.lastChunkData = nil
- c.lastChunkFileId = ""
+ c.readerCache.destroy()
return nil
}
@@ -117,15 +110,13 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
startOffset, remaining := offset, int64(len(p))
- var nextChunk *ChunkView
+ var nextChunks []*ChunkView
for i, chunk := range c.chunkViews {
if remaining <= 0 {
break
}
if i+1 < len(c.chunkViews) {
- nextChunk = c.chunkViews[i+1]
- } else {
- nextChunk = nil
+ nextChunks = c.chunkViews[i+1:]
}
if startOffset < chunk.LogicOffset {
gap := int(chunk.LogicOffset - startOffset)
@@ -142,16 +133,13 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
continue
}
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
- var buffer []byte
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
- bufferLength := chunkStop - chunkStart
- buffer, err = c.readChunkSlice(chunk, nextChunk, uint64(bufferOffset), uint64(bufferLength))
+ copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return
+ return copied, err
}
- copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer)
n += copied
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
}
@@ -173,104 +161,25 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
-func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) {
+func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) {
- var chunkSlice []byte
- if chunkView.LogicOffset == 0 {
- chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
- }
- if len(chunkSlice) > 0 {
- return chunkSlice, nil
- }
- if c.lookupFileId == nil {
- return nil, nil
- }
if c.readerPattern.IsRandomMode() {
- return c.doFetchRangeChunkData(chunkView, offset, length)
- }
- chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews)
- if err != nil {
- return nil, err
- }
- wanted := min(int64(length), int64(len(chunkData))-int64(offset))
- return chunkData[offset : int64(offset)+wanted], nil
-}
-
-func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) {
-
- if c.lastChunkFileId == chunkView.FileId {
- return c.lastChunkData, nil
- }
-
- v, doErr := c.readOneWholeChunk(chunkView)
-
- if doErr != nil {
- return nil, doErr
+ return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
}
- chunkData = v.([]byte)
-
- c.lastChunkData = chunkData
- c.lastChunkFileId = chunkView.FileId
-
- for _, nextChunkView := range nextChunkViews {
- if c.chunkCache != nil && nextChunkView != nil {
- go c.readOneWholeChunk(nextChunkView)
+ n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
+ if c.lastChunkFid != chunkView.FileId {
+ if chunkView.Offset == 0 { // start of a new chunk
+ if c.lastChunkFid != "" {
+ c.readerCache.UnCache(c.lastChunkFid)
+ c.readerCache.MaybeCache(nextChunkViews)
+ } else {
+ if len(nextChunkViews) >= 1 {
+ c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning
+ }
+ }
}
}
-
+ c.lastChunkFid = chunkView.FileId
return
}
-
-func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) {
-
- var err error
-
- return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) {
-
- glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
-
- var data []byte
- if chunkView.LogicOffset == 0 {
- data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
- }
- if data != nil {
- glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
- } else {
- var err error
- data, err = c.doFetchFullChunkData(chunkView)
- if err != nil {
- return data, err
- }
- if chunkView.LogicOffset == 0 {
- // only cache the first chunk
- c.chunkCache.SetChunk(chunkView.FileId, data)
- }
- }
- return data, err
- })
-}
-
-func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) {
-
- glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
-
- data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
-
- glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
-
- return data, err
-
-}
-
-func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) {
-
- glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
-
- data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length))
-
- glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
-
- return data, err
-
-}
diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go
index 411d7eb3b..d9afb460c 100644
--- a/weed/filer/reader_at_test.go
+++ b/weed/filer/reader_at_test.go
@@ -21,8 +21,12 @@ func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
return data
}
-func (m *mockChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte {
- return m.GetChunk(fileId, length)
+func (m *mockChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
+ x, _ := strconv.Atoi(fileId)
+ for i := 0; i < len(data); i++ {
+ data[i] = byte(x)
+ }
+ return len(data), nil
}
func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
@@ -65,10 +69,9 @@ func TestReaderAt(t *testing.T) {
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
readerLock: sync.Mutex{},
fileSize: 10,
- chunkCache: &mockChunkCache{},
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
}
@@ -81,7 +84,7 @@ func TestReaderAt(t *testing.T) {
func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) {
data := make([]byte, size)
- n, err := readerAt.ReadAt(data, offset)
+ n, err := readerAt.doReadAt(data, offset)
for _, d := range data {
fmt.Printf("%x", d)
@@ -116,10 +119,9 @@ func TestReaderAt0(t *testing.T) {
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
readerLock: sync.Mutex{},
fileSize: 10,
- chunkCache: &mockChunkCache{},
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
}
@@ -145,10 +147,9 @@ func TestReaderAt1(t *testing.T) {
readerAt := &ChunkReadAt{
chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
readerLock: sync.Mutex{},
fileSize: 20,
- chunkCache: &mockChunkCache{},
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
readerPattern: NewReaderPattern(),
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
new file mode 100644
index 000000000..bce97cc49
--- /dev/null
+++ b/weed/filer/reader_cache.go
@@ -0,0 +1,192 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "sync"
+ "time"
+)
+
+type ReaderCache struct {
+ chunkCache chunk_cache.ChunkCache
+ lookupFileIdFn wdclient.LookupFileIdFunctionType
+ sync.Mutex
+ downloaders map[string]*SingleChunkCacher
+ limit int
+}
+
+type SingleChunkCacher struct {
+ sync.RWMutex
+ parent *ReaderCache
+ chunkFileId string
+ data []byte
+ err error
+ cipherKey []byte
+ isGzipped bool
+ chunkSize int
+ shouldCache bool
+ wg sync.WaitGroup
+ completedTime time.Time
+}
+
+func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
+ return &ReaderCache{
+ limit: limit,
+ chunkCache: chunkCache,
+ lookupFileIdFn: lookupFileIdFn,
+ downloaders: make(map[string]*SingleChunkCacher),
+ }
+}
+
+func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
+ if rc.lookupFileIdFn == nil {
+ return
+ }
+
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, chunkView := range chunkViews {
+ if _, found := rc.downloaders[chunkView.FileId]; found {
+ continue
+ }
+
+ if len(rc.downloaders) >= rc.limit {
+ // if still no slots, return
+ return
+ }
+
+ // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
+ // cache this chunk if not yet
+ cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[chunkView.FileId] = cacher
+
+ }
+
+ return
+}
+
+func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
+ rc.Lock()
+ defer rc.Unlock()
+ if cacher, found := rc.downloaders[fileId]; found {
+ return cacher.readChunkAt(buffer, offset)
+ }
+ if shouldCache || rc.lookupFileIdFn == nil {
+ n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
+ if n > 0 {
+ return n, err
+ }
+ }
+
+ if len(rc.downloaders) >= rc.limit {
+ oldestFid, oldestTime := "", time.Now()
+ for fid, downloader := range rc.downloaders {
+ if !downloader.completedTime.IsZero() {
+ if downloader.completedTime.Before(oldestTime) {
+ oldestFid, oldestTime = fid, downloader.completedTime
+ }
+ }
+ }
+ if oldestFid != "" {
+ oldDownloader := rc.downloaders[oldestFid]
+ delete(rc.downloaders, oldestFid)
+ oldDownloader.destroy()
+ }
+ }
+
+ // glog.V(4).Infof("cache1 %s", fileId)
+
+ cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[fileId] = cacher
+
+ return cacher.readChunkAt(buffer, offset)
+}
+
+func (rc *ReaderCache) UnCache(fileId string) {
+ rc.Lock()
+ defer rc.Unlock()
+ // glog.V(4).Infof("uncache %s", fileId)
+ if downloader, found := rc.downloaders[fileId]; found {
+ downloader.destroy()
+ delete(rc.downloaders, fileId)
+ }
+}
+
+func (rc *ReaderCache) destroy() {
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, downloader := range rc.downloaders {
+ downloader.destroy()
+ }
+
+}
+
+func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
+ t := &SingleChunkCacher{
+ parent: parent,
+ chunkFileId: fileId,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ chunkSize: chunkSize,
+ shouldCache: shouldCache,
+ }
+ return t
+}
+
+func (s *SingleChunkCacher) startCaching() {
+ s.Lock()
+ defer s.Unlock()
+
+ s.wg.Done() // means this has been started
+
+ urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
+ if err != nil {
+ s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
+ return
+ }
+
+ s.data = mem.Allocate(s.chunkSize)
+
+ _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
+ if s.err != nil {
+ mem.Free(s.data)
+ s.data = nil
+ return
+ }
+
+ s.completedTime = time.Now()
+ if s.shouldCache {
+ s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
+ }
+
+ return
+}
+
+func (s *SingleChunkCacher) destroy() {
+ if s.data != nil {
+ mem.Free(s.data)
+ s.data = nil
+ }
+}
+
+func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
+ s.RLock()
+ defer s.RUnlock()
+
+ if s.err != nil {
+ return 0, s.err
+ }
+
+ return copy(buf, s.data[offset:]), nil
+
+}
diff --git a/weed/filer/redis_lua/redis_cluster_store.go b/weed/filer/redis_lua/redis_cluster_store.go
new file mode 100644
index 000000000..b68d1092c
--- /dev/null
+++ b/weed/filer/redis_lua/redis_cluster_store.go
@@ -0,0 +1,44 @@
+package redis_lua
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisLuaClusterStore{})
+}
+
+type RedisLuaClusterStore struct {
+ UniversalRedisLuaStore
+}
+
+func (store *RedisLuaClusterStore) GetName() string {
+ return "redis_lua_cluster"
+}
+
+func (store *RedisLuaClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
+ )
+}
+
+func (store *RedisLuaClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ store.loadSuperLargeDirectories(superLargeDirectories)
+ return
+}
diff --git a/weed/filer/redis_lua/redis_sentinel_store.go b/weed/filer/redis_lua/redis_sentinel_store.go
new file mode 100644
index 000000000..5530c098e
--- /dev/null
+++ b/weed/filer/redis_lua/redis_sentinel_store.go
@@ -0,0 +1,45 @@
+package redis_lua
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+ "time"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisLuaSentinelStore{})
+}
+
+type RedisLuaSentinelStore struct {
+ UniversalRedisLuaStore
+}
+
+func (store *RedisLuaSentinelStore) GetName() string {
+ return "redis_lua_sentinel"
+}
+
+func (store *RedisLuaSentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"masterName"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) {
+ store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
+ MasterName: masterName,
+ SentinelAddrs: addresses,
+ Username: username,
+ Password: password,
+ DB: database,
+ MinRetryBackoff: time.Millisecond * 100,
+ MaxRetryBackoff: time.Minute * 1,
+ ReadTimeout: time.Second * 30,
+ WriteTimeout: time.Second * 5,
+ })
+ return
+}
diff --git a/weed/filer/redis_lua/redis_store.go b/weed/filer/redis_lua/redis_store.go
new file mode 100644
index 000000000..a7d11c73c
--- /dev/null
+++ b/weed/filer/redis_lua/redis_store.go
@@ -0,0 +1,38 @@
+package redis_lua
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisLuaStore{})
+}
+
+type RedisLuaStore struct {
+ UniversalRedisLuaStore
+}
+
+func (store *RedisLuaStore) GetName() string {
+ return "redis_lua"
+}
+
+func (store *RedisLuaStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
+ )
+}
+
+func (store *RedisLuaStore) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ store.loadSuperLargeDirectories(superLargeDirectories)
+ return
+}
diff --git a/weed/filer/redis_lua/stored_procedure/delete_entry.lua b/weed/filer/redis_lua/stored_procedure/delete_entry.lua
new file mode 100644
index 000000000..445337c77
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/delete_entry.lua
@@ -0,0 +1,19 @@
+-- KEYS[1]: full path of entry
+local fullpath = KEYS[1]
+-- KEYS[2]: full path of entry
+local fullpath_list_key = KEYS[2]
+-- KEYS[3]: dir of the entry
+local dir_list_key = KEYS[3]
+
+-- ARGV[1]: isSuperLargeDirectory
+local isSuperLargeDirectory = ARGV[1] == "1"
+-- ARGV[2]: name of the entry
+local name = ARGV[2]
+
+redis.call("DEL", fullpath, fullpath_list_key)
+
+if not isSuperLargeDirectory and name ~= "" then
+ redis.call("ZREM", dir_list_key, name)
+end
+
+return 0 \ No newline at end of file
diff --git a/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua
new file mode 100644
index 000000000..77e4839f9
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua
@@ -0,0 +1,15 @@
+-- KEYS[1]: full path of entry
+local fullpath = KEYS[1]
+
+if fullpath ~= "" and string.sub(fullpath, -1) == "/" then
+ fullpath = string.sub(fullpath, 0, -2)
+end
+
+local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1")
+
+for _, name in ipairs(files) do
+ local file_path = fullpath .. "/" .. name
+ redis.call("DEL", file_path, file_path .. "\0")
+end
+
+return 0 \ No newline at end of file
diff --git a/weed/filer/redis_lua/stored_procedure/init.go b/weed/filer/redis_lua/stored_procedure/init.go
new file mode 100644
index 000000000..1412ceba2
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/init.go
@@ -0,0 +1,24 @@
+package stored_procedure
+
+import (
+ _ "embed"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ InsertEntryScript = redis.NewScript(insertEntry)
+ DeleteEntryScript = redis.NewScript(deleteEntry)
+ DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren)
+}
+
+//go:embed insert_entry.lua
+var insertEntry string
+var InsertEntryScript *redis.Script
+
+//go:embed delete_entry.lua
+var deleteEntry string
+var DeleteEntryScript *redis.Script
+
+//go:embed delete_folder_children.lua
+var deleteFolderChildren string
+var DeleteFolderChildrenScript *redis.Script
diff --git a/weed/filer/redis_lua/stored_procedure/insert_entry.lua b/weed/filer/redis_lua/stored_procedure/insert_entry.lua
new file mode 100644
index 000000000..8deef3446
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/insert_entry.lua
@@ -0,0 +1,27 @@
+-- KEYS[1]: full path of entry
+local full_path = KEYS[1]
+-- KEYS[2]: dir of the entry
+local dir_list_key = KEYS[2]
+
+-- ARGV[1]: content of the entry
+local entry = ARGV[1]
+-- ARGV[2]: TTL of the entry
+local ttlSec = tonumber(ARGV[2])
+-- ARGV[3]: isSuperLargeDirectory
+local isSuperLargeDirectory = ARGV[3] == "1"
+-- ARGV[4]: zscore of the entry in zset
+local zscore = tonumber(ARGV[4])
+-- ARGV[5]: name of the entry
+local name = ARGV[5]
+
+if ttlSec > 0 then
+ redis.call("SET", full_path, entry, "EX", ttlSec)
+else
+ redis.call("SET", full_path, entry)
+end
+
+if not isSuperLargeDirectory and name ~= "" then
+ redis.call("ZADD", dir_list_key, "NX", zscore, name)
+end
+
+return 0 \ No newline at end of file
diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go
new file mode 100644
index 000000000..9674ac03f
--- /dev/null
+++ b/weed/filer/redis_lua/universal_redis_store.go
@@ -0,0 +1,191 @@
+package redis_lua
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/redis_lua/stored_procedure"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedisLuaStore struct {
+ Client redis.UniversalClient
+ superLargeDirectoryHash map[string]bool
+}
+
+func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
+ _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) {
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]bool)
+ for _, dir := range superLargeDirectories {
+ store.superLargeDirectoryHash[dir] = true
+ }
+}
+
+func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+
+ err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
+ []string{string(entry.FullPath), genDirectoryListKey(dir)},
+ value, entry.TtlSec,
+ store.isSuperLargeDirectory(dir), 0, name).Err()
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ dir, name := fullpath.DirAndName()
+
+ err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
+ []string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)},
+ store.isSuperLargeDirectory(dir), name).Err()
+
+ if err != nil {
+ return fmt.Errorf("DeleteEntry %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ if store.isSuperLargeDirectory(string(fullpath)) {
+ return nil
+ }
+
+ err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
+ []string{string(fullpath)}).Err()
+
+ if err != nil {
+ return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ dirListKey := genDirectoryListKey(string(dirPath))
+
+ min := "-"
+ if startFileName != "" {
+ if includeStartFile {
+ min = "[" + startFileName
+ } else {
+ min = "(" + startFileName
+ }
+ }
+
+ members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
+ Min: min,
+ Max: "+",
+ Offset: 0,
+ Count: limit,
+ }).Result()
+ if err != nil {
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
+ }
+
+ // fetch entry meta
+ for _, fileName := range members {
+ path := util.NewFullPath(string(dirPath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ continue
+ }
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.DeleteEntry(ctx, path)
+ continue
+ }
+ }
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ }
+
+ return lastFileName, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedisLuaStore) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/redis_lua/universal_redis_store_kv.go b/weed/filer/redis_lua/universal_redis_store_kv.go
new file mode 100644
index 000000000..3df980b66
--- /dev/null
+++ b/weed/filer/redis_lua/universal_redis_store_kv.go
@@ -0,0 +1,42 @@
+package redis_lua
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/go-redis/redis/v8"
+)
+
+func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ data, err := store.Client.Get(ctx, string(key)).Result()
+
+ if err == redis.Nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return []byte(data), err
+}
+
+func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ _, err = store.Client.Del(ctx, string(key)).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
index fbf8b3112..faabcd341 100644
--- a/weed/filer/rocksdb/rocksdb_store_test.go
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -16,8 +16,7 @@ import (
func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -35,7 +34,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -70,8 +69,7 @@ func TestCreateAndFind(t *testing.T) {
func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -93,8 +91,7 @@ func TestEmptyRoot(t *testing.T) {
func BenchmarkInsertEntry(b *testing.B) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := os.MkdirTemp("", "seaweedfs_filer_bench")
- defer os.RemoveAll(dir)
+ dir := b.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index e5163f2d9..36278f0b1 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -62,7 +62,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
+ glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -104,10 +104,12 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
}
- glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
- err := writeZero(writer, remaining)
- if err != nil {
- return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ if remaining > 0 {
+ glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
+ err := writeZero(writer, remaining)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ }
}
return nil
@@ -133,30 +135,30 @@ func writeZero(w io.Writer, size int64) (err error) {
return
}
-func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
-
- buffer := bytes.Buffer{}
+func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
+
+ idx := 0
for _, chunkView := range chunkViews {
urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return nil, err
+ return err
}
- data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
if err != nil {
- return nil, err
+ return err
}
- buffer.Write(data)
+ idx += n
}
- return buffer.Bytes(), nil
+ return nil
}
// ---------------- ChunkStreamReader ----------------------------------