diff options
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/entry.go | 1 | ||||
| -rw-r--r-- | weed/filer2/entry_codec.go | 6 | ||||
| -rw-r--r-- | weed/filer2/filechunks.go | 37 | ||||
| -rw-r--r-- | weed/filer2/filer.go | 14 | ||||
| -rw-r--r-- | weed/filer2/filer_buckets.go | 19 | ||||
| -rw-r--r-- | weed/filer2/filer_notify.go | 24 | ||||
| -rw-r--r-- | weed/filer2/filer_notify_append.go | 47 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store_test.go | 4 | ||||
| -rw-r--r-- | weed/filer2/leveldb2/leveldb2_store_test.go | 4 | ||||
| -rw-r--r-- | weed/filer2/reader_at.go | 14 | ||||
| -rw-r--r-- | weed/filer2/redis2/redis_cluster_store.go | 42 | ||||
| -rw-r--r-- | weed/filer2/redis2/redis_store.go | 36 | ||||
| -rw-r--r-- | weed/filer2/redis2/universal_redis_store.go | 162 | ||||
| -rw-r--r-- | weed/filer2/stream.go | 4 | ||||
| -rw-r--r-- | weed/filer2/topics.go | 6 |
15 files changed, 359 insertions, 61 deletions
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index ef6c8f9a6..6dff99af9 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -21,6 +21,7 @@ type Attr struct { UserName string GroupNames []string SymlinkTarget string + Md5 []byte } func (attr Attr) IsDirectory() bool { diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go index 3a2dc6134..47c911011 100644 --- a/weed/filer2/entry_codec.go +++ b/weed/filer2/entry_codec.go @@ -52,6 +52,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { UserName: entry.Attr.UserName, GroupName: entry.Attr.GroupNames, SymlinkTarget: entry.Attr.SymlinkTarget, + Md5: entry.Attr.Md5, } } @@ -71,6 +72,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t.UserName = attr.UserName t.GroupNames = attr.GroupName t.SymlinkTarget = attr.SymlinkTarget + t.Md5 = attr.Md5 return t } @@ -93,6 +95,10 @@ func EqualEntry(a, b *Entry) bool { return false } + if !bytes.Equal(a.Md5, b.Md5) { + return false + } + for i := 0; i < len(a.Chunks); i++ { if !proto.Equal(a.Chunks[i], b.Chunks[i]) { return false diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 0c93c389b..2ddfb3c30 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -20,7 +20,21 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { return } -func ETag(chunks []*filer_pb.FileChunk) (etag string) { +func ETag(entry *filer_pb.Entry) (etag string) { + if entry.Attributes == nil || entry.Attributes.Md5 == nil { + return ETagChunks(entry.Chunks) + } + return fmt.Sprintf("%x", entry.Attributes.Md5) +} + +func ETagEntry(entry *Entry) (etag string) { + if entry.Attr.Md5 == nil { + return ETagChunks(entry.Chunks) + } + return fmt.Sprintf("%x", entry.Attr.Md5) +} + +func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { if len(chunks) == 1 { return chunks[0].ETag } @@ -71,11 +85,15 @@ type ChunkView struct { Offset int64 Size uint64 LogicOffset int64 - IsFullChunk bool + ChunkSize uint64 CipherKey []byte IsGzipped bool } +func (cv *ChunkView) IsFullChunk() bool { + return cv.Size == cv.ChunkSize +} + func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { visibles := NonOverlappingVisibleIntervals(chunks) @@ -97,13 +115,12 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int for _, chunk := range visibles { if chunk.start <= offset && offset < chunk.stop && offset < stop { - isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop views = append(views, &ChunkView{ FileId: chunk.fileId, Offset: offset - chunk.start, // offset is the data starting location in this file id Size: uint64(min(chunk.stop, stop) - offset), LogicOffset: offset, - IsFullChunk: isFullChunk, + ChunkSize: chunk.chunkSize, CipherKey: chunk.cipherKey, IsGzipped: chunk.isGzipped, }) @@ -132,7 +149,7 @@ var bufPool = sync.Pool{ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { - newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped) + newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsGzipped) length := len(visibles) if length == 0 { @@ -146,11 +163,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. logPrintf(" before", visibles) for _, v := range visibles { if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped)) + newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) } chunkStop := chunk.Offset + int64(chunk.Size) if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped)) + newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) } if chunkStop <= v.start || v.stop <= chunk.Offset { newVisibles = append(newVisibles, v) @@ -202,18 +219,18 @@ type VisibleInterval struct { stop int64 modifiedTime int64 fileId string - isFullChunk bool + chunkSize uint64 cipherKey []byte isGzipped bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval { +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { return VisibleInterval{ start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime, - isFullChunk: isFullChunk, + chunkSize: chunkSize, cipherKey: cipherKey, isGzipped: isGzipped, } diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index af17bf56c..acd609847 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -13,8 +13,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/queue" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -32,20 +32,24 @@ type Filer struct { fileIdDeletionQueue *util.UnboundedQueue GrpcDialOption grpc.DialOption DirBucketsPath string - DirQueuesPath string + FsyncBuckets []string buckets *FilerBuckets Cipher bool - metaLogBuffer *queue.LogBuffer + metaLogBuffer *log_buffer.LogBuffer + metaLogCollection string + metaLogReplication string } -func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, notifyFn func()) *Filer { +func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { f := &Filer{ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, } - f.metaLogBuffer = queue.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) + f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) + f.metaLogCollection = collection + f.metaLogReplication = replication go f.loopProcessingDeletion() diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go index 3fc4afdab..7a57e7ee1 100644 --- a/weed/filer2/filer_buckets.go +++ b/weed/filer2/filer_buckets.go @@ -13,6 +13,7 @@ type BucketName string type BucketOption struct { Name BucketName Replication string + fsync bool } type FilerBuckets struct { dirBucketsPath string @@ -20,36 +21,42 @@ type FilerBuckets struct { sync.RWMutex } -func (f *Filer) LoadBuckets(dirBucketsPath string) { +func (f *Filer) LoadBuckets() { f.buckets = &FilerBuckets{ buckets: make(map[BucketName]*BucketOption), } - f.DirBucketsPath = dirBucketsPath limit := math.MaxInt32 - entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit) + entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit) if err != nil { glog.V(1).Infof("no buckets found: %v", err) return } + shouldFsyncMap := make(map[string]bool) + for _, bucket := range f.FsyncBuckets { + shouldFsyncMap[bucket] = true + } + glog.V(1).Infof("buckets found: %d", len(entries)) f.buckets.Lock() for _, entry := range entries { + _, shouldFsnyc := shouldFsyncMap[entry.Name()] f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ Name: BucketName(entry.Name()), Replication: entry.Replication, + fsync: shouldFsnyc, } } f.buckets.Unlock() } -func (f *Filer) ReadBucketOption(buketName string) (replication string) { +func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) { f.buckets.RLock() defer f.buckets.RUnlock() @@ -57,9 +64,9 @@ func (f *Filer) ReadBucketOption(buketName string) (replication string) { option, found := f.buckets.buckets[BucketName(buketName)] if !found { - return "" + return "", false } - return option.Replication + return option.Replication, option.fsync } diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index f85ee1db4..c4fcefd54 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -25,7 +25,7 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) // println("fullpath:", fullpath) - if strings.HasPrefix(fullpath, "/.meta") { + if strings.HasPrefix(fullpath, SystemLogDir) { return } @@ -45,32 +45,34 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) notification.Queue.SendMessage(fullpath, eventNotification) } - f.logMetaEvent(time.Now(), fullpath, eventNotification) + f.logMetaEvent(fullpath, eventNotification) } -func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *filer_pb.EventNotification) { +func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) { dir, _ := util.FullPath(fullpath).DirAndName() - event := &filer_pb.FullEventNotification{ + event := &filer_pb.SubscribeMetadataResponse{ Directory: dir, EventNotification: eventNotification, } data, err := proto.Marshal(event) if err != nil { - glog.Errorf("failed to marshal filer_pb.FullEventNotification %+v: %v", event, err) + glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err) return } - f.metaLogBuffer.AddToBuffer(ts, []byte(dir), data) + f.metaLogBuffer.AddToBuffer([]byte(dir), data) } func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { - targetFile := fmt.Sprintf("/.meta/log/%04d/%02d/%02d/%02d/%02d/%02d.%09d.log", + + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - startTime.Second(), startTime.Nanosecond()) + // startTime.Second(), startTime.Nanosecond(), + ) if err := f.appendToFile(targetFile, buf); err != nil { glog.V(0).Infof("log write failed %s: %v", targetFile, err) @@ -95,11 +97,11 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err) } - event := &filer_pb.FullEventNotification{} + event := &filer_pb.SubscribeMetadataResponse{} err = proto.Unmarshal(logEntry.Data, event) if err != nil { - glog.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err) - return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err) + glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) } err = eachEventFn(event.Directory, event.EventNotification) diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go index 4c134ae66..6671cd909 100644 --- a/weed/filer2/filer_notify_append.go +++ b/weed/filer2/filer_notify_append.go @@ -13,26 +13,11 @@ import ( func (f *Filer) appendToFile(targetFile string, data []byte) error { - // assign a volume location - assignRequest := &operation.VolumeAssignRequest{ - Count: 1, - } - assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) - if err != nil { - return fmt.Errorf("AssignVolume: %v", err) - } - if assignResult.Error != "" { - return fmt.Errorf("AssignVolume error: %v", assignResult.Error) + assignResult, uploadResult, err2 := f.assignAndUpload(data) + if err2 != nil { + return err2 } - // upload data - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth) - if err != nil { - return fmt.Errorf("upload data %s: %v", targetUrl, err) - } - // println("uploaded to", targetUrl) - // find out existing entry fullpath := util.FullPath(targetFile) entry, err := f.FindEntry(context.Background(), fullpath) @@ -68,3 +53,29 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { return err } + +func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) { + // assign a volume location + assignRequest := &operation.VolumeAssignRequest{ + Count: 1, + Collection: f.metaLogCollection, + Replication: f.metaLogReplication, + WritableVolumeCount: 1, + } + assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) + if err != nil { + return nil, nil, fmt.Errorf("AssignVolume: %v", err) + } + if assignResult.Error != "" { + return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error) + } + + // upload data + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth) + if err != nil { + return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + // println("uploaded to", targetUrl) + return assignResult, uploadResult, nil +} diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 21d126322..4f415bb9c 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -11,7 +11,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDBStore{} @@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDBStore{} diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index 324b07d6c..d4ab2c163 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -11,7 +11,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDB2Store{} @@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDB2Store{} diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index b9913d2ca..f56ef6388 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -9,8 +9,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/pb_cache" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -22,12 +22,12 @@ type ChunkReadAt struct { lookupFileId func(fileId string) (targetUrl string, err error) readerLock sync.Mutex - chunkCache *pb_cache.ChunkCache + chunkCache *chunk_cache.ChunkCache } // var _ = io.ReaderAt(&ChunkReadAt{}) -func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *pb_cache.ChunkCache) *ChunkReadAt { +func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, @@ -105,9 +105,11 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err // fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) - chunkData := c.chunkCache.GetChunk(chunkView.FileId) + hasDataInCache := false + chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) if chunkData != nil { glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + hasDataInCache = true } else { chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) if err != nil { @@ -121,7 +123,9 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)] - c.chunkCache.SetChunk(chunkView.FileId, chunkData) + if !hasDataInCache { + c.chunkCache.SetChunk(chunkView.FileId, chunkData) + } return data, nil } diff --git a/weed/filer2/redis2/redis_cluster_store.go b/weed/filer2/redis2/redis_cluster_store.go new file mode 100644 index 000000000..b252eabab --- /dev/null +++ b/weed/filer2/redis2/redis_cluster_store.go @@ -0,0 +1,42 @@ +package redis2 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer2.Stores = append(filer2.Stores, &RedisCluster2Store{}) +} + +type RedisCluster2Store struct { + UniversalRedis2Store +} + +func (store *RedisCluster2Store) GetName() string { + return "redis_cluster2" +} + +func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", true) + configuration.SetDefault(prefix+"routeByLatency", true) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + ) +} + +func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + return +} diff --git a/weed/filer2/redis2/redis_store.go b/weed/filer2/redis2/redis_store.go new file mode 100644 index 000000000..1e2a20043 --- /dev/null +++ b/weed/filer2/redis2/redis_store.go @@ -0,0 +1,36 @@ +package redis2 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer2.Stores = append(filer2.Stores, &Redis2Store{}) +} + +type Redis2Store struct { + UniversalRedis2Store +} + +func (store *Redis2Store) GetName() string { + return "redis2" +} + +func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + return +} diff --git a/weed/filer2/redis2/universal_redis_store.go b/weed/filer2/redis2/universal_redis_store.go new file mode 100644 index 000000000..420336b46 --- /dev/null +++ b/weed/filer2/redis2/universal_redis_store.go @@ -0,0 +1,162 @@ +package redis2 + +import ( + "context" + "fmt" + "time" + + "github.com/go-redis/redis" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedis2Store struct { + Client redis.UniversalClient +} + +func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + dir, name := entry.FullPath.DirAndName() + if name != "" { + if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { + + data, err := store.Client.Get(string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer2.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks([]byte(data)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + _, err = store.Client.Del(string(fullpath)).Result() + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + dir, name := fullpath.DirAndName() + if name != "" { + _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() + if err != nil { + return fmt.Errorf("delete folder %s : %v", fullpath, err) + } + + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + _, err = store.Client.Del(string(path)).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer2.Entry, err error) { + + dirListKey := genDirectoryListKey(string(fullpath)) + start := int64(0) + if startFileName != "" { + start, _ = store.Client.ZRank(dirListKey, startFileName).Result() + if !inclusive { + start++ + } + } + members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result() + if err != nil { + return nil, fmt.Errorf("list %s : %v", fullpath, err) + } + + // fetch entry meta + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + entry, err := store.FindEntry(ctx, path) + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(string(path)).Result() + store.Client.ZRem(dirListKey, fileName).Result() + continue + } + } + entries = append(entries, entry) + } + } + + return entries, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedis2Store) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index bf3781ae2..3cb69f72b 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -31,7 +31,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f for _, chunkView := range chunkViews { urlString := fileId2Url[chunkView.FileId] - err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) { + err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { w.Write(data) }) if err != nil { @@ -128,7 +128,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) { + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if err != nil { diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go new file mode 100644 index 000000000..9c6e5c88d --- /dev/null +++ b/weed/filer2/topics.go @@ -0,0 +1,6 @@ +package filer2 + +const ( + TopicsDir = "/topics" + SystemLogDir = TopicsDir + "/.system/log" +) |
