diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 9 | ||||
| -rw-r--r-- | weed/filer/mongodb/mongodb_store.go | 4 | ||||
| -rw-r--r-- | weed/filer/reader_at.go | 43 | ||||
| -rw-r--r-- | weed/filer/reader_pattern.go | 31 | ||||
| -rw-r--r-- | weed/filer/redis3/redis_sentinel_store.go | 49 |
5 files changed, 129 insertions, 7 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 32008271b..b6a64b30d 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -101,6 +101,15 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0) } +func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) { + urlStrings, err := lookupFileIdFn(fileId) + if err != nil { + glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size) +} + func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { var err error diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 1ef5056f4..6935be1ab 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -193,6 +193,10 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti optLimit := int64(limit) opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}} cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts) + if err != nil { + return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) + } + for cur.Next(ctx) { var data Model err := cur.Decode(&data) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 458cf88be..5f58b870c 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -26,6 +26,7 @@ type ChunkReadAt struct { chunkCache chunk_cache.ChunkCache lastChunkFileId string lastChunkData []byte + readerPattern *ReaderPattern } var _ = io.ReaderAt(&ChunkReadAt{}) @@ -88,10 +89,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ - chunkViews: chunkViews, - lookupFileId: lookupFn, - chunkCache: chunkCache, - fileSize: fileSize, + chunkViews: chunkViews, + lookupFileId: lookupFn, + chunkCache: chunkCache, + fileSize: fileSize, + readerPattern: NewReaderPattern(), } } @@ -106,6 +108,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerLock.Lock() defer c.readerLock.Unlock() + c.readerPattern.MonitorReadAt(offset, len(p)) + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) return c.doReadAt(p, offset) } @@ -171,7 +175,14 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) { - chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) + if c.readerPattern.IsRandomMode() { + return c.doFetchRangeChunkData(chunkView, offset, length) + } + + var chunkSlice []byte + if chunkView.LogicOffset == 0 { + chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) + } if len(chunkSlice) > 0 { return chunkSlice, nil } @@ -217,7 +228,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) - data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + var data []byte + if chunkView.LogicOffset == 0 { + data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + } if data != nil { glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) } else { @@ -226,7 +240,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro if err != nil { return data, err } - c.chunkCache.SetChunk(chunkView.FileId, data) + if chunkView.LogicOffset == 0 { + // only cache the first chunk + c.chunkCache.SetChunk(chunkView.FileId, data) + } } return data, err }) @@ -243,3 +260,15 @@ func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) return data, err } + +func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) { + + glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) + + data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length)) + + glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) + + return data, err + +} diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go new file mode 100644 index 000000000..2bf18d141 --- /dev/null +++ b/weed/filer/reader_pattern.go @@ -0,0 +1,31 @@ +package filer + +type ReaderPattern struct { + isStreaming bool + lastReadOffset int64 +} + +// For streaming read: only cache the first chunk +// For random read: only fetch the requested range, instead of the whole chunk + +func NewReaderPattern() *ReaderPattern { + return &ReaderPattern{ + isStreaming: true, + lastReadOffset: 0, + } +} + +func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { + if rp.lastReadOffset > offset { + rp.isStreaming = false + } + rp.lastReadOffset = offset +} + +func (rp *ReaderPattern) IsStreamingMode() bool { + return rp.isStreaming +} + +func (rp *ReaderPattern) IsRandomMode() bool { + return !rp.isStreaming +} diff --git a/weed/filer/redis3/redis_sentinel_store.go b/weed/filer/redis3/redis_sentinel_store.go new file mode 100644 index 000000000..a87302167 --- /dev/null +++ b/weed/filer/redis3/redis_sentinel_store.go @@ -0,0 +1,49 @@ +package redis3 + +import ( + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &Redis3SentinelStore{}) +} + +type Redis3SentinelStore struct { + UniversalRedis3Store +} + +func (store *Redis3SentinelStore) GetName() string { + return "redis3_sentinel" +} + +func (store *Redis3SentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"masterName"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis3SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) { + store.Client = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: masterName, + SentinelAddrs: addresses, + Username: username, + Password: password, + DB: database, + MinRetryBackoff: time.Millisecond * 100, + MaxRetryBackoff: time.Minute * 1, + ReadTimeout: time.Second * 30, + WriteTimeout: time.Second * 5, + }) + store.redsync = redsync.New(goredis.NewPool(store.Client)) + return +} |
