aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorbukton <buk_ton2@hotmail.com>2020-04-19 00:21:45 +0700
committerbukton <buk_ton2@hotmail.com>2020-04-19 00:21:45 +0700
commit290c6b7f01f7b148a65ba10dd6536ad2567d7653 (patch)
treee1abb141849419c40691097eea032b944fcbd748 /weed/filer2
parent6234ea441b6388838a19635c656316047f42917d (diff)
parent11f5a6d91346e5f3cbf3b46e0a660e231c5c2998 (diff)
downloadseaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.tar.xz
seaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.zip
Merge remote-tracking branch 'origin/master' into filer_mongodb
# Conflicts: # go.mod # go.sum # weed/server/filer_server.go
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/entry.go1
-rw-r--r--weed/filer2/entry_codec.go6
-rw-r--r--weed/filer2/filechunks.go37
-rw-r--r--weed/filer2/filer.go14
-rw-r--r--weed/filer2/filer_buckets.go19
-rw-r--r--weed/filer2/filer_notify.go24
-rw-r--r--weed/filer2/filer_notify_append.go47
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go4
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go4
-rw-r--r--weed/filer2/reader_at.go14
-rw-r--r--weed/filer2/redis2/redis_cluster_store.go42
-rw-r--r--weed/filer2/redis2/redis_store.go36
-rw-r--r--weed/filer2/redis2/universal_redis_store.go162
-rw-r--r--weed/filer2/stream.go4
-rw-r--r--weed/filer2/topics.go6
15 files changed, 359 insertions, 61 deletions
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index ef6c8f9a6..6dff99af9 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -21,6 +21,7 @@ type Attr struct {
UserName string
GroupNames []string
SymlinkTarget string
+ Md5 []byte
}
func (attr Attr) IsDirectory() bool {
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
index 3a2dc6134..47c911011 100644
--- a/weed/filer2/entry_codec.go
+++ b/weed/filer2/entry_codec.go
@@ -52,6 +52,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
+ Md5: entry.Attr.Md5,
}
}
@@ -71,6 +72,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget
+ t.Md5 = attr.Md5
return t
}
@@ -93,6 +95,10 @@ func EqualEntry(a, b *Entry) bool {
return false
}
+ if !bytes.Equal(a.Md5, b.Md5) {
+ return false
+ }
+
for i := 0; i < len(a.Chunks); i++ {
if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
return false
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index 0c93c389b..2ddfb3c30 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -20,7 +20,21 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
return
}
-func ETag(chunks []*filer_pb.FileChunk) (etag string) {
+func ETag(entry *filer_pb.Entry) (etag string) {
+ if entry.Attributes == nil || entry.Attributes.Md5 == nil {
+ return ETagChunks(entry.Chunks)
+ }
+ return fmt.Sprintf("%x", entry.Attributes.Md5)
+}
+
+func ETagEntry(entry *Entry) (etag string) {
+ if entry.Attr.Md5 == nil {
+ return ETagChunks(entry.Chunks)
+ }
+ return fmt.Sprintf("%x", entry.Attr.Md5)
+}
+
+func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
if len(chunks) == 1 {
return chunks[0].ETag
}
@@ -71,11 +85,15 @@ type ChunkView struct {
Offset int64
Size uint64
LogicOffset int64
- IsFullChunk bool
+ ChunkSize uint64
CipherKey []byte
IsGzipped bool
}
+func (cv *ChunkView) IsFullChunk() bool {
+ return cv.Size == cv.ChunkSize
+}
+
func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles := NonOverlappingVisibleIntervals(chunks)
@@ -97,13 +115,12 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
for _, chunk := range visibles {
if chunk.start <= offset && offset < chunk.stop && offset < stop {
- isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
views = append(views, &ChunkView{
FileId: chunk.fileId,
Offset: offset - chunk.start, // offset is the data starting location in this file id
Size: uint64(min(chunk.stop, stop) - offset),
LogicOffset: offset,
- IsFullChunk: isFullChunk,
+ ChunkSize: chunk.chunkSize,
CipherKey: chunk.cipherKey,
IsGzipped: chunk.isGzipped,
})
@@ -132,7 +149,7 @@ var bufPool = sync.Pool{
func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
- newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped)
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsGzipped)
length := len(visibles)
if length == 0 {
@@ -146,11 +163,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
logPrintf(" before", visibles)
for _, v := range visibles {
if v.start < chunk.Offset && chunk.Offset < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
+ newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
chunkStop := chunk.Offset + int64(chunk.Size)
if v.start < chunkStop && chunkStop < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
+ newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
if chunkStop <= v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
@@ -202,18 +219,18 @@ type VisibleInterval struct {
stop int64
modifiedTime int64
fileId string
- isFullChunk bool
+ chunkSize uint64
cipherKey []byte
isGzipped bool
}
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval {
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
return VisibleInterval{
start: start,
stop: stop,
fileId: fileId,
modifiedTime: modifiedTime,
- isFullChunk: isFullChunk,
+ chunkSize: chunkSize,
cipherKey: cipherKey,
isGzipped: isGzipped,
}
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index af17bf56c..acd609847 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -13,8 +13,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/queue"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -32,20 +32,24 @@ type Filer struct {
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
- DirQueuesPath string
+ FsyncBuckets []string
buckets *FilerBuckets
Cipher bool
- metaLogBuffer *queue.LogBuffer
+ metaLogBuffer *log_buffer.LogBuffer
+ metaLogCollection string
+ metaLogReplication string
}
-func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, notifyFn func()) *Filer {
+func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
}
- f.metaLogBuffer = queue.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
+ f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
+ f.metaLogCollection = collection
+ f.metaLogReplication = replication
go f.loopProcessingDeletion()
diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go
index 3fc4afdab..7a57e7ee1 100644
--- a/weed/filer2/filer_buckets.go
+++ b/weed/filer2/filer_buckets.go
@@ -13,6 +13,7 @@ type BucketName string
type BucketOption struct {
Name BucketName
Replication string
+ fsync bool
}
type FilerBuckets struct {
dirBucketsPath string
@@ -20,36 +21,42 @@ type FilerBuckets struct {
sync.RWMutex
}
-func (f *Filer) LoadBuckets(dirBucketsPath string) {
+func (f *Filer) LoadBuckets() {
f.buckets = &FilerBuckets{
buckets: make(map[BucketName]*BucketOption),
}
- f.DirBucketsPath = dirBucketsPath
limit := math.MaxInt32
- entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit)
+ entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit)
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
return
}
+ shouldFsyncMap := make(map[string]bool)
+ for _, bucket := range f.FsyncBuckets {
+ shouldFsyncMap[bucket] = true
+ }
+
glog.V(1).Infof("buckets found: %d", len(entries))
f.buckets.Lock()
for _, entry := range entries {
+ _, shouldFsnyc := shouldFsyncMap[entry.Name()]
f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
Name: BucketName(entry.Name()),
Replication: entry.Replication,
+ fsync: shouldFsnyc,
}
}
f.buckets.Unlock()
}
-func (f *Filer) ReadBucketOption(buketName string) (replication string) {
+func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
f.buckets.RLock()
defer f.buckets.RUnlock()
@@ -57,9 +64,9 @@ func (f *Filer) ReadBucketOption(buketName string) (replication string) {
option, found := f.buckets.buckets[BucketName(buketName)]
if !found {
- return ""
+ return "", false
}
- return option.Replication
+ return option.Replication, option.fsync
}
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index f85ee1db4..c4fcefd54 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -25,7 +25,7 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
// println("fullpath:", fullpath)
- if strings.HasPrefix(fullpath, "/.meta") {
+ if strings.HasPrefix(fullpath, SystemLogDir) {
return
}
@@ -45,32 +45,34 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
notification.Queue.SendMessage(fullpath, eventNotification)
}
- f.logMetaEvent(time.Now(), fullpath, eventNotification)
+ f.logMetaEvent(fullpath, eventNotification)
}
-func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *filer_pb.EventNotification) {
+func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) {
dir, _ := util.FullPath(fullpath).DirAndName()
- event := &filer_pb.FullEventNotification{
+ event := &filer_pb.SubscribeMetadataResponse{
Directory: dir,
EventNotification: eventNotification,
}
data, err := proto.Marshal(event)
if err != nil {
- glog.Errorf("failed to marshal filer_pb.FullEventNotification %+v: %v", event, err)
+ glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
return
}
- f.metaLogBuffer.AddToBuffer(ts, []byte(dir), data)
+ f.metaLogBuffer.AddToBuffer([]byte(dir), data)
}
func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
- targetFile := fmt.Sprintf("/.meta/log/%04d/%02d/%02d/%02d/%02d/%02d.%09d.log",
+
+ targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- startTime.Second(), startTime.Nanosecond())
+ // startTime.Second(), startTime.Nanosecond(),
+ )
if err := f.appendToFile(targetFile, buf); err != nil {
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
@@ -95,11 +97,11 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err)
}
- event := &filer_pb.FullEventNotification{}
+ event := &filer_pb.SubscribeMetadataResponse{}
err = proto.Unmarshal(logEntry.Data, event)
if err != nil {
- glog.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err)
- return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err)
+ glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
}
err = eachEventFn(event.Directory, event.EventNotification)
diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go
index 4c134ae66..6671cd909 100644
--- a/weed/filer2/filer_notify_append.go
+++ b/weed/filer2/filer_notify_append.go
@@ -13,26 +13,11 @@ import (
func (f *Filer) appendToFile(targetFile string, data []byte) error {
- // assign a volume location
- assignRequest := &operation.VolumeAssignRequest{
- Count: 1,
- }
- assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
- if err != nil {
- return fmt.Errorf("AssignVolume: %v", err)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("AssignVolume error: %v", assignResult.Error)
+ assignResult, uploadResult, err2 := f.assignAndUpload(data)
+ if err2 != nil {
+ return err2
}
- // upload data
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth)
- if err != nil {
- return fmt.Errorf("upload data %s: %v", targetUrl, err)
- }
- // println("uploaded to", targetUrl)
-
// find out existing entry
fullpath := util.FullPath(targetFile)
entry, err := f.FindEntry(context.Background(), fullpath)
@@ -68,3 +53,29 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
return err
}
+
+func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+ // assign a volume location
+ assignRequest := &operation.VolumeAssignRequest{
+ Count: 1,
+ Collection: f.metaLogCollection,
+ Replication: f.metaLogReplication,
+ WritableVolumeCount: 1,
+ }
+ assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("AssignVolume: %v", err)
+ }
+ if assignResult.Error != "" {
+ return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
+ }
+
+ // upload data
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, uploadResult, nil
+}
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 21d126322..4f415bb9c 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -11,7 +11,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index 324b07d6c..d4ab2c163 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -11,7 +11,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
@@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, 0, nil)
+ filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
index b9913d2ca..f56ef6388 100644
--- a/weed/filer2/reader_at.go
+++ b/weed/filer2/reader_at.go
@@ -9,8 +9,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/pb_cache"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -22,12 +22,12 @@ type ChunkReadAt struct {
lookupFileId func(fileId string) (targetUrl string, err error)
readerLock sync.Mutex
- chunkCache *pb_cache.ChunkCache
+ chunkCache *chunk_cache.ChunkCache
}
// var _ = io.ReaderAt(&ChunkReadAt{})
-func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *pb_cache.ChunkCache) *ChunkReadAt {
+func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
@@ -105,9 +105,11 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
// fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
- chunkData := c.chunkCache.GetChunk(chunkView.FileId)
+ hasDataInCache := false
+ chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
if chunkData != nil {
glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+ hasDataInCache = true
} else {
chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
if err != nil {
@@ -121,7 +123,9 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)]
- c.chunkCache.SetChunk(chunkView.FileId, chunkData)
+ if !hasDataInCache {
+ c.chunkCache.SetChunk(chunkView.FileId, chunkData)
+ }
return data, nil
}
diff --git a/weed/filer2/redis2/redis_cluster_store.go b/weed/filer2/redis2/redis_cluster_store.go
new file mode 100644
index 000000000..b252eabab
--- /dev/null
+++ b/weed/filer2/redis2/redis_cluster_store.go
@@ -0,0 +1,42 @@
+package redis2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &RedisCluster2Store{})
+}
+
+type RedisCluster2Store struct {
+ UniversalRedis2Store
+}
+
+func (store *RedisCluster2Store) GetName() string {
+ return "redis_cluster2"
+}
+
+func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", true)
+ configuration.SetDefault(prefix+"routeByLatency", true)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ )
+}
+
+func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ return
+}
diff --git a/weed/filer2/redis2/redis_store.go b/weed/filer2/redis2/redis_store.go
new file mode 100644
index 000000000..1e2a20043
--- /dev/null
+++ b/weed/filer2/redis2/redis_store.go
@@ -0,0 +1,36 @@
+package redis2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &Redis2Store{})
+}
+
+type Redis2Store struct {
+ UniversalRedis2Store
+}
+
+func (store *Redis2Store) GetName() string {
+ return "redis2"
+}
+
+func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ return
+}
diff --git a/weed/filer2/redis2/universal_redis_store.go b/weed/filer2/redis2/universal_redis_store.go
new file mode 100644
index 000000000..420336b46
--- /dev/null
+++ b/weed/filer2/redis2/universal_redis_store.go
@@ -0,0 +1,162 @@
+package redis2
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-redis/redis"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedis2Store struct {
+ Client redis.UniversalClient
+}
+
+func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+ if name != "" {
+ if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
+ return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
+
+ data, err := store.Client.Get(string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks([]byte(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ _, err = store.Client.Del(string(fullpath)).Result()
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ dir, name := fullpath.DirAndName()
+ if name != "" {
+ _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
+ if err != nil {
+ return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ }
+
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ _, err = store.Client.Del(string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer2.Entry, err error) {
+
+ dirListKey := genDirectoryListKey(string(fullpath))
+ start := int64(0)
+ if startFileName != "" {
+ start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
+ if !inclusive {
+ start++
+ }
+ }
+ members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+
+ // fetch entry meta
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(string(path)).Result()
+ store.Client.ZRem(dirListKey, fileName).Result()
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ }
+
+ return entries, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedis2Store) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index bf3781ae2..3cb69f72b 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -31,7 +31,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
for _, chunkView := range chunkViews {
urlString := fileId2Url[chunkView.FileId]
- err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
w.Write(data)
})
if err != nil {
@@ -128,7 +128,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {
diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go
new file mode 100644
index 000000000..9c6e5c88d
--- /dev/null
+++ b/weed/filer2/topics.go
@@ -0,0 +1,6 @@
+package filer2
+
+const (
+ TopicsDir = "/topics"
+ SystemLogDir = TopicsDir + "/.system/log"
+)