aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunk_manifest.go9
-rw-r--r--weed/filer/mongodb/mongodb_store.go4
-rw-r--r--weed/filer/reader_at.go43
-rw-r--r--weed/filer/reader_pattern.go31
-rw-r--r--weed/filer/redis3/redis_sentinel_store.go49
5 files changed, 129 insertions, 7 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 32008271b..b6a64b30d 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -101,6 +101,15 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string,
return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
}
+func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) {
+ urlStrings, err := lookupFileIdFn(fileId)
+ if err != nil {
+ glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+ return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size)
+}
+
func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
var err error
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index 1ef5056f4..6935be1ab 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -193,6 +193,10 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti
optLimit := int64(limit)
opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}}
cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts)
+ if err != nil {
+ return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err)
+ }
+
for cur.Next(ctx) {
var data Model
err := cur.Decode(&data)
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 458cf88be..5f58b870c 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -26,6 +26,7 @@ type ChunkReadAt struct {
chunkCache chunk_cache.ChunkCache
lastChunkFileId string
lastChunkData []byte
+ readerPattern *ReaderPattern
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -88,10 +89,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
- chunkViews: chunkViews,
- lookupFileId: lookupFn,
- chunkCache: chunkCache,
- fileSize: fileSize,
+ chunkViews: chunkViews,
+ lookupFileId: lookupFn,
+ chunkCache: chunkCache,
+ fileSize: fileSize,
+ readerPattern: NewReaderPattern(),
}
}
@@ -106,6 +108,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
+ c.readerPattern.MonitorReadAt(offset, len(p))
+
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
@@ -171,7 +175,14 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) {
- chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
+ if c.readerPattern.IsRandomMode() {
+ return c.doFetchRangeChunkData(chunkView, offset, length)
+ }
+
+ var chunkSlice []byte
+ if chunkView.LogicOffset == 0 {
+ chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
+ }
if len(chunkSlice) > 0 {
return chunkSlice, nil
}
@@ -217,7 +228,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro
glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
- data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ var data []byte
+ if chunkView.LogicOffset == 0 {
+ data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ }
if data != nil {
glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
} else {
@@ -226,7 +240,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro
if err != nil {
return data, err
}
- c.chunkCache.SetChunk(chunkView.FileId, data)
+ if chunkView.LogicOffset == 0 {
+ // only cache the first chunk
+ c.chunkCache.SetChunk(chunkView.FileId, data)
+ }
}
return data, err
})
@@ -243,3 +260,15 @@ func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error)
return data, err
}
+
+func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) {
+
+ glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
+
+ data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length))
+
+ glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
+
+ return data, err
+
+}
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
new file mode 100644
index 000000000..2bf18d141
--- /dev/null
+++ b/weed/filer/reader_pattern.go
@@ -0,0 +1,31 @@
+package filer
+
+type ReaderPattern struct {
+ isStreaming bool
+ lastReadOffset int64
+}
+
+// For streaming read: only cache the first chunk
+// For random read: only fetch the requested range, instead of the whole chunk
+
+func NewReaderPattern() *ReaderPattern {
+ return &ReaderPattern{
+ isStreaming: true,
+ lastReadOffset: 0,
+ }
+}
+
+func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
+ if rp.lastReadOffset > offset {
+ rp.isStreaming = false
+ }
+ rp.lastReadOffset = offset
+}
+
+func (rp *ReaderPattern) IsStreamingMode() bool {
+ return rp.isStreaming
+}
+
+func (rp *ReaderPattern) IsRandomMode() bool {
+ return !rp.isStreaming
+}
diff --git a/weed/filer/redis3/redis_sentinel_store.go b/weed/filer/redis3/redis_sentinel_store.go
new file mode 100644
index 000000000..a87302167
--- /dev/null
+++ b/weed/filer/redis3/redis_sentinel_store.go
@@ -0,0 +1,49 @@
+package redis3
+
+import (
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+ "github.com/go-redsync/redsync/v4"
+ "github.com/go-redsync/redsync/v4/redis/goredis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &Redis3SentinelStore{})
+}
+
+type Redis3SentinelStore struct {
+ UniversalRedis3Store
+}
+
+func (store *Redis3SentinelStore) GetName() string {
+ return "redis3_sentinel"
+}
+
+func (store *Redis3SentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"masterName"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis3SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) {
+ store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
+ MasterName: masterName,
+ SentinelAddrs: addresses,
+ Username: username,
+ Password: password,
+ DB: database,
+ MinRetryBackoff: time.Millisecond * 100,
+ MaxRetryBackoff: time.Minute * 1,
+ ReadTimeout: time.Second * 30,
+ WriteTimeout: time.Second * 5,
+ })
+ store.redsync = redsync.New(goredis.NewPool(store.Client))
+ return
+}