aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
committershibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
commit40334bc28d3fa694ce59b4e65077efb845264d20 (patch)
treea085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/filer2
parentd892cad15d748327c2b7c649f6398ff35d8dce0b (diff)
parentfbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff)
downloadseaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz
seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go17
-rw-r--r--weed/filer2/cassandra/cassandra_store.go23
-rw-r--r--weed/filer2/entry.go12
-rw-r--r--weed/filer2/entry_codec.go6
-rw-r--r--weed/filer2/etcd/etcd_store.go22
-rw-r--r--weed/filer2/filechunks.go78
-rw-r--r--weed/filer2/filechunks_test.go2
-rw-r--r--weed/filer2/filer.go108
-rw-r--r--weed/filer2/filer_buckets.go121
-rw-r--r--weed/filer2/filer_client_util.go172
-rw-r--r--weed/filer2/filer_delete_entry.go50
-rw-r--r--weed/filer2/filer_deletion.go61
-rw-r--r--weed/filer2/filer_notify.go150
-rw-r--r--weed/filer2/filer_notify_append.go73
-rw-r--r--weed/filer2/filer_notify_test.go4
-rw-r--r--weed/filer2/filerstore.go25
-rw-r--r--weed/filer2/fullpath.go42
-rw-r--r--weed/filer2/leveldb/leveldb_store.go29
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go16
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go27
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go16
-rw-r--r--weed/filer2/mongodb/mongodb_store.go210
-rw-r--r--weed/filer2/postgres/postgres_store.go10
-rw-r--r--weed/filer2/reader_at.go156
-rw-r--r--weed/filer2/redis/universal_redis_store.go38
-rw-r--r--weed/filer2/redis2/redis_cluster_store.go42
-rw-r--r--weed/filer2/redis2/redis_store.go36
-rw-r--r--weed/filer2/redis2/universal_redis_store.go162
-rw-r--r--weed/filer2/stream.go162
-rw-r--r--weed/filer2/tikv/tikv_store.go251
-rw-r--r--weed/filer2/tikv/tikv_store_unsupported.go65
-rw-r--r--weed/filer2/topics.go6
32 files changed, 1475 insertions, 717 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index 47fe507a1..5ade18960 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -98,13 +99,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
return nil
}
-func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
+func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer2.Entry, error) {
dir, name := fullpath.DirAndName()
row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
var data []byte
if err := row.Scan(&data); err != nil {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
entry := &filer2.Entry{
@@ -117,7 +118,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.Fu
return entry, nil
}
-func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
@@ -134,7 +135,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
return nil
}
-func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
if err != nil {
@@ -149,7 +150,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return nil
}
-func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive
if inclusive {
@@ -171,7 +172,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(fullpath), name),
}
if err = entry.DecodeAttributesAndChunks(data); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
@@ -183,3 +184,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
return entries, nil
}
+
+func (store *AbstractSqlStore) Shutdown() {
+ store.DB.Close()
+}
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index f81ef946f..5dd7d8036 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -3,10 +3,13 @@ package cassandra
import (
"context"
"fmt"
+
+ "github.com/gocql/gocql"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gocql/gocql"
)
func init() {
@@ -72,7 +75,7 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entr
return store.InsertEntry(ctx, entry)
}
-func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
dir, name := fullpath.DirAndName()
var data []byte
@@ -80,12 +83,12 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
dir, name).Consistency(gocql.One).Scan(&data); err != nil {
if err != gocql.ErrNotFound {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
}
if len(data) == 0 {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
entry = &filer2.Entry{
@@ -99,7 +102,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full
return entry, nil
}
-func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
@@ -112,7 +115,7 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu
return nil
}
-func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=?",
@@ -123,7 +126,7 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
return nil
}
-func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
@@ -136,7 +139,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath
iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
for iter.Scan(&name, &data) {
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(fullpath), name),
}
if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
err = decodeErr
@@ -151,3 +154,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath
return entries, err
}
+
+func (store *CassandraStore) Shutdown() {
+ store.session.Close()
+}
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index c901927bb..00b9b132d 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type Attr struct {
@@ -20,6 +21,7 @@ type Attr struct {
UserName string
GroupNames []string
SymlinkTarget string
+ Md5 []byte
}
func (attr Attr) IsDirectory() bool {
@@ -27,7 +29,7 @@ func (attr Attr) IsDirectory() bool {
}
type Entry struct {
- FullPath
+ util.FullPath
Attr
Extended map[string][]byte
@@ -71,3 +73,11 @@ func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
Entry: entry.ToProtoEntry(),
}
}
+
+func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry {
+ return &Entry{
+ FullPath: util.NewFullPath(dir, entry.Name),
+ Attr: PbToEntryAttribute(entry.Attributes),
+ Chunks: entry.Chunks,
+ }
+}
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
index 3a2dc6134..47c911011 100644
--- a/weed/filer2/entry_codec.go
+++ b/weed/filer2/entry_codec.go
@@ -52,6 +52,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
+ Md5: entry.Attr.Md5,
}
}
@@ -71,6 +72,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget
+ t.Md5 = attr.Md5
return t
}
@@ -93,6 +95,10 @@ func EqualEntry(a, b *Entry) bool {
return false
}
+ if !bytes.Equal(a.Md5, b.Md5) {
+ return false
+ }
+
for i := 0; i < len(a.Chunks); i++ {
if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
return false
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
index 0f0c01426..2ef65b4a0 100644
--- a/weed/filer2/etcd/etcd_store.go
+++ b/weed/filer2/etcd/etcd_store.go
@@ -6,10 +6,12 @@ import (
"strings"
"time"
+ "go.etcd.io/etcd/clientv3"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
- "go.etcd.io/etcd/clientv3"
)
const (
@@ -90,7 +92,7 @@ func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (e
return store.InsertEntry(ctx, entry)
}
-func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
key := genKey(fullpath.DirAndName())
resp, err := store.client.Get(ctx, string(key))
@@ -99,7 +101,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath)
}
if len(resp.Kvs) == 0 {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
entry = &filer2.Entry{
@@ -113,7 +115,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath)
return entry, nil
}
-func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
key := genKey(fullpath.DirAndName())
if _, err := store.client.Delete(ctx, string(key)); err != nil {
@@ -123,7 +125,7 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
return nil
}
-func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
@@ -134,7 +136,7 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer
}
func (store *EtcdStore) ListDirectoryEntries(
- ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
@@ -157,7 +159,7 @@ func (store *EtcdStore) ListDirectoryEntries(
break
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
err = decodeErr
@@ -177,7 +179,7 @@ func genKey(dirPath, fileName string) (key []byte) {
return key
}
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
keyPrefix = []byte(string(fullpath))
keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
if len(startFileName) > 0 {
@@ -194,3 +196,7 @@ func getNameFromKey(key []byte) string {
return string(key[sepIndex+1:])
}
+
+func (store *EtcdStore) Shutdown() {
+ store.client.Close()
+}
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index b5876df82..2ddfb3c30 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -3,6 +3,7 @@ package filer2
import (
"fmt"
"hash/fnv"
+ "math"
"sort"
"sync"
@@ -19,7 +20,21 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
return
}
-func ETag(chunks []*filer_pb.FileChunk) (etag string) {
+func ETag(entry *filer_pb.Entry) (etag string) {
+ if entry.Attributes == nil || entry.Attributes.Md5 == nil {
+ return ETagChunks(entry.Chunks)
+ }
+ return fmt.Sprintf("%x", entry.Attributes.Md5)
+}
+
+func ETagEntry(entry *Entry) (etag string) {
+ if entry.Attr.Md5 == nil {
+ return ETagChunks(entry.Chunks)
+ }
+ return fmt.Sprintf("%x", entry.Attr.Md5)
+}
+
+func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
if len(chunks) == 1 {
return chunks[0].ETag
}
@@ -70,10 +85,16 @@ type ChunkView struct {
Offset int64
Size uint64
LogicOffset int64
- IsFullChunk bool
+ ChunkSize uint64
+ CipherKey []byte
+ IsGzipped bool
}
-func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) {
+func (cv *ChunkView) IsFullChunk() bool {
+ return cv.Size == cv.ChunkSize
+}
+
+func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles := NonOverlappingVisibleIntervals(chunks)
@@ -81,19 +102,27 @@ func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views
}
-func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int) (views []*ChunkView) {
+func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) {
- stop := offset + int64(size)
+ stop := offset + size
+ if size == math.MaxInt64 {
+ stop = math.MaxInt64
+ }
+ if stop < offset {
+ stop = math.MaxInt64
+ }
for _, chunk := range visibles {
+
if chunk.start <= offset && offset < chunk.stop && offset < stop {
- isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
views = append(views, &ChunkView{
FileId: chunk.fileId,
Offset: offset - chunk.start, // offset is the data starting location in this file id
Size: uint64(min(chunk.stop, stop) - offset),
LogicOffset: offset,
- IsFullChunk: isFullChunk,
+ ChunkSize: chunk.chunkSize,
+ CipherKey: chunk.cipherKey,
+ IsGzipped: chunk.isGzipped,
})
offset = min(chunk.stop, stop)
}
@@ -120,13 +149,7 @@ var bufPool = sync.Pool{
func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
- newV := newVisibleInterval(
- chunk.Offset,
- chunk.Offset+int64(chunk.Size),
- chunk.GetFileIdString(),
- chunk.Mtime,
- true,
- )
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsGzipped)
length := len(visibles)
if length == 0 {
@@ -140,23 +163,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
logPrintf(" before", visibles)
for _, v := range visibles {
if v.start < chunk.Offset && chunk.Offset < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(
- v.start,
- chunk.Offset,
- v.fileId,
- v.modifiedTime,
- false,
- ))
+ newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
chunkStop := chunk.Offset + int64(chunk.Size)
if v.start < chunkStop && chunkStop < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false,
- ))
+ newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
if chunkStop <= v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
@@ -187,6 +198,7 @@ func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []Vi
var newVisibles []VisibleInterval
for _, chunk := range chunks {
+
newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk)
t := visibles[:0]
visibles = newVisibles
@@ -207,16 +219,20 @@ type VisibleInterval struct {
stop int64
modifiedTime int64
fileId string
- isFullChunk bool
+ chunkSize uint64
+ cipherKey []byte
+ isGzipped bool
}
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) VisibleInterval {
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
return VisibleInterval{
start: start,
stop: stop,
fileId: fileId,
modifiedTime: modifiedTime,
- isFullChunk: isFullChunk,
+ chunkSize: chunkSize,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
}
}
diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go
index bb4a6c74d..7b1133b85 100644
--- a/weed/filer2/filechunks_test.go
+++ b/weed/filer2/filechunks_test.go
@@ -218,7 +218,7 @@ func TestChunksReading(t *testing.T) {
testcases := []struct {
Chunks []*filer_pb.FileChunk
Offset int64
- Size int
+ Size int64
Expected []*ChunkView
}{
// case 0: normal
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index a0af942e0..666ab8fe4 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
- "path/filepath"
"strings"
"time"
@@ -13,6 +12,9 @@ import (
"github.com/karlseguin/ccache"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -24,20 +26,30 @@ var (
)
type Filer struct {
- store *FilerStoreWrapper
- directoryCache *ccache.Cache
- MasterClient *wdclient.MasterClient
- fileIdDeletionChan chan string
- GrpcDialOption grpc.DialOption
+ store *FilerStoreWrapper
+ directoryCache *ccache.Cache
+ MasterClient *wdclient.MasterClient
+ fileIdDeletionQueue *util.UnboundedQueue
+ GrpcDialOption grpc.DialOption
+ DirBucketsPath string
+ FsyncBuckets []string
+ buckets *FilerBuckets
+ Cipher bool
+ MetaLogBuffer *log_buffer.LogBuffer
+ metaLogCollection string
+ metaLogReplication string
}
-func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
+func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
- fileIdDeletionChan: make(chan string, PaginationSize),
- GrpcDialOption: grpcDialOption,
+ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
+ fileIdDeletionQueue: util.NewUnboundedQueue(),
+ GrpcDialOption: grpcDialOption,
}
+ f.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
+ f.metaLogCollection = collection
+ f.metaLogReplication = replication
go f.loopProcessingDeletion()
@@ -85,7 +97,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
var lastDirectoryEntry *Entry
for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...))
+ dirPath := "/" + util.Join(dirParts[:i]...)
// fmt.Printf("%d directory: %+v\n", i, dirPath)
// first check local cache
@@ -94,7 +106,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
// not found, check the store directly
if dirEntry == nil {
glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
+ dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath))
} else {
// glog.V(4).Infof("found cached directory: %s", dirPath)
}
@@ -106,24 +118,29 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
now := time.Now()
dirEntry = &Entry{
- FullPath: FullPath(dirPath),
+ FullPath: util.FullPath(dirPath),
Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | 0770,
- Uid: entry.Uid,
- Gid: entry.Gid,
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0110,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
},
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
+ f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(nil, dirEntry, false)
}
@@ -174,6 +191,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
}
}
+ f.maybeAddBucket(entry)
f.NotifyUpdateEvent(oldEntry, entry, true)
f.deleteChunksIfNotNew(oldEntry, entry)
@@ -197,7 +215,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
return f.store.UpdateEntry(ctx, entry)
}
-func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
+func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
now := time.Now()
@@ -213,14 +231,51 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
},
}, nil
}
- return f.store.FindEntry(ctx, p)
+ entry, err = f.store.FindEntry(ctx, p)
+ if entry != nil && entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ return nil, filer_pb.ErrNotFound
+ }
+ }
+ return
+
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+
+ var makeupEntries []*Entry
+ entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ for expiredCount > 0 && err == nil {
+ makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount)
+ if err == nil {
+ entries = append(entries, makeupEntries...)
+ }
+ }
+
+ return entries, err
+}
+
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
+ listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ if listErr != nil {
+ return listedEntries, expiredCount, "", listErr
+ }
+ for _, entry := range listedEntries {
+ lastFileName = entry.Name()
+ if entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ expiredCount++
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ return
}
func (f *Filer) cacheDelDirectory(dirpath string) {
@@ -261,3 +316,8 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
}
+
+func (f *Filer) Shutdown() {
+ f.MetaLogBuffer.Shutdown()
+ f.store.Shutdown()
+}
diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go
new file mode 100644
index 000000000..7a57e7ee1
--- /dev/null
+++ b/weed/filer2/filer_buckets.go
@@ -0,0 +1,121 @@
+package filer2
+
+import (
+ "context"
+ "math"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type BucketName string
+type BucketOption struct {
+ Name BucketName
+ Replication string
+ fsync bool
+}
+type FilerBuckets struct {
+ dirBucketsPath string
+ buckets map[BucketName]*BucketOption
+ sync.RWMutex
+}
+
+func (f *Filer) LoadBuckets() {
+
+ f.buckets = &FilerBuckets{
+ buckets: make(map[BucketName]*BucketOption),
+ }
+
+ limit := math.MaxInt32
+
+ entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit)
+
+ if err != nil {
+ glog.V(1).Infof("no buckets found: %v", err)
+ return
+ }
+
+ shouldFsyncMap := make(map[string]bool)
+ for _, bucket := range f.FsyncBuckets {
+ shouldFsyncMap[bucket] = true
+ }
+
+ glog.V(1).Infof("buckets found: %d", len(entries))
+
+ f.buckets.Lock()
+ for _, entry := range entries {
+ _, shouldFsnyc := shouldFsyncMap[entry.Name()]
+ f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
+ Name: BucketName(entry.Name()),
+ Replication: entry.Replication,
+ fsync: shouldFsnyc,
+ }
+ }
+ f.buckets.Unlock()
+
+}
+
+func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
+
+ f.buckets.RLock()
+ defer f.buckets.RUnlock()
+
+ option, found := f.buckets.buckets[BucketName(buketName)]
+
+ if !found {
+ return "", false
+ }
+ return option.Replication, option.fsync
+
+}
+
+func (f *Filer) isBucket(entry *Entry) bool {
+ if !entry.IsDirectory() {
+ return false
+ }
+ parent, dirName := entry.FullPath.DirAndName()
+ if parent != f.DirBucketsPath {
+ return false
+ }
+
+ f.buckets.RLock()
+ defer f.buckets.RUnlock()
+
+ _, found := f.buckets.buckets[BucketName(dirName)]
+
+ return found
+
+}
+
+func (f *Filer) maybeAddBucket(entry *Entry) {
+ if !entry.IsDirectory() {
+ return
+ }
+ parent, dirName := entry.FullPath.DirAndName()
+ if parent != f.DirBucketsPath {
+ return
+ }
+ f.addBucket(dirName, &BucketOption{
+ Name: BucketName(dirName),
+ Replication: entry.Replication,
+ })
+}
+
+func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {
+
+ f.buckets.Lock()
+ defer f.buckets.Unlock()
+
+ f.buckets.buckets[BucketName(buketName)] = bucketOption
+
+}
+
+func (f *Filer) deleteBucket(buketName string) {
+
+ f.buckets.Lock()
+ defer f.buckets.Unlock()
+
+ delete(f.buckets.buckets, BucketName(buketName))
+
+}
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
deleted file mode 100644
index af804b909..000000000
--- a/weed/filer2/filer_client_util.go
+++ /dev/null
@@ -1,172 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "io"
- "math"
- "strings"
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func VolumeId(fileId string) string {
- lastCommaIndex := strings.LastIndex(fileId, ",")
- if lastCommaIndex > 0 {
- return fileId[:lastCommaIndex]
- }
- return fileId
-}
-
-type FilerClient interface {
- WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error
-}
-
-func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
- var vids []string
- for _, chunkView := range chunkViews {
- vids = append(vids, VolumeId(chunkView.FileId))
- }
-
- vid2Locations := make(map[string]*filer_pb.Locations)
-
- err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
-
- glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
- VolumeIds: vids,
- })
- if err != nil {
- return err
- }
-
- vid2Locations = resp.LocationsMap
-
- return nil
- })
-
- if err != nil {
- return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
- }
-
- var wg sync.WaitGroup
- for _, chunkView := range chunkViews {
- wg.Add(1)
- go func(chunkView *ChunkView) {
- defer wg.Done()
-
- glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
-
- locations := vid2Locations[VolumeId(chunkView.FileId)]
- if locations == nil || len(locations.Locations) == 0 {
- glog.V(0).Infof("failed to locate %s", chunkView.FileId)
- err = fmt.Errorf("failed to locate %s", chunkView.FileId)
- return
- }
-
- var n int64
- n, err = util.ReadUrl(
- fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
- chunkView.Offset,
- int(chunkView.Size),
- buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
- !chunkView.IsFullChunk)
-
- if err != nil {
-
- glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err)
-
- err = fmt.Errorf("failed to read http://%s/%s: %v",
- locations.Locations[0].Url, chunkView.FileId, err)
- return
- }
-
- glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
- totalRead += n
-
- }(chunkView)
- }
- wg.Wait()
- return
-}
-
-func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
-
- dir, name := fullFilePath.DirAndName()
-
- err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir,
- Name: name,
- }
-
- // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil {
- if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
- return nil
- }
- glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
- return err
- }
-
- if resp.Entry == nil {
- // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
- return nil
- }
-
- entry = resp.Entry
- return nil
- })
-
- return
-}
-
-func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
-
- err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
-
- lastEntryName := ""
-
- request := &filer_pb.ListEntriesRequest{
- Directory: string(fullDirPath),
- Prefix: prefix,
- StartFromFileName: lastEntryName,
- Limit: math.MaxUint32,
- }
-
- glog.V(3).Infof("read directory: %v", request)
- stream, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
-
- var prevEntry *filer_pb.Entry
- for {
- resp, recvErr := stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- if prevEntry != nil {
- fn(prevEntry, true)
- }
- break
- } else {
- return recvErr
- }
- }
- if prevEntry != nil {
- fn(prevEntry, false)
- }
- prevEntry = resp.Entry
- }
-
- return nil
-
- })
-
- return
-}
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
index 75a09e7ef..2fb53c579 100644
--- a/weed/filer2/filer_delete_entry.go
+++ b/weed/filer2/filer_delete_entry.go
@@ -6,9 +6,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
-func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
if p == "/" {
return nil
}
@@ -18,27 +20,35 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs
return findErr
}
+ isCollection := f.isBucket(entry)
+
var chunks []*filer_pb.FileChunk
chunks = append(chunks, entry.Chunks...)
if entry.IsDirectory() {
// delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection)
if err != nil {
+ glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
}
chunks = append(chunks, dirChunks...)
- f.cacheDelDirectory(string(p))
}
+
// delete the file or folder
err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
if err != nil {
return fmt.Errorf("delete file %s: %v", p, err)
}
- if shouldDeleteChunks {
+ if shouldDeleteChunks && !isCollection {
go f.DeleteChunks(chunks)
}
+ if isCollection {
+ collectionName := entry.Name()
+ f.doDeleteCollection(collectionName)
+ f.deleteBucket(collectionName)
+ }
return nil
}
@@ -63,13 +73,15 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
var dirChunks []*filer_pb.FileChunk
if sub.IsDirectory() {
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ f.cacheDelDirectory(string(sub.FullPath))
+ f.NotifyUpdateEvent(sub, nil, shouldDeleteChunks)
+ chunks = append(chunks, dirChunks...)
+ } else {
+ chunks = append(chunks, sub.Chunks...)
}
if err != nil && !ignoreRecursiveError {
return nil, err
}
- if shouldDeleteChunks {
- chunks = append(chunks, dirChunks...)
- }
}
if len(entries) < PaginationSize {
@@ -77,26 +89,40 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
}
}
- f.cacheDelDirectory(string(entry.FullPath))
-
- glog.V(3).Infof("deleting directory %v", entry.FullPath)
+ glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
return chunks, nil
}
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
- glog.V(3).Infof("deleting entry %v", entry.FullPath)
+ glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
+ if entry.IsDirectory() {
+ f.cacheDelDirectory(string(entry.FullPath))
+ }
f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
return nil
}
+
+func (f *Filer) doDeleteCollection(collectionName string) (err error) {
+
+ return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ Name: collectionName,
+ })
+ if err != nil {
+ glog.Infof("delete collection %s: %v", collectionName, err)
+ }
+ return err
+ })
+
+}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 9937685f7..a6b229771 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -6,16 +6,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
)
-func (f *Filer) loopProcessingDeletion() {
-
- ticker := time.NewTicker(5 * time.Second)
-
- lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
+func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error) {
+ return func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
for _, vid := range vids {
- locs, _ := f.MasterClient.GetVidLocations(vid)
+ locs, _ := masterClient.GetVidLocations(vid)
var locations []operation.Location
for _, loc := range locs {
locations = append(locations, operation.Location{
@@ -30,37 +28,56 @@ func (f *Filer) loopProcessingDeletion() {
}
return m, nil
}
+}
+
+func (f *Filer) loopProcessingDeletion() {
+
+ lookupFunc := LookupByMasterClientFn(f.MasterClient)
+
+ DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
- var fileIds []string
+ var deletionCount int
for {
- select {
- case fid := <-f.fileIdDeletionChan:
- fileIds = append(fileIds, fid)
- if len(fileIds) >= 4096 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
+ deletionCount = 0
+ f.fileIdDeletionQueue.Consume(func(fileIds []string) {
+ for len(fileIds) > 0 {
+ var toDeleteFileIds []string
+ if len(fileIds) > DeletionBatchSize {
+ toDeleteFileIds = fileIds[:DeletionBatchSize]
+ fileIds = fileIds[DeletionBatchSize:]
+ } else {
+ toDeleteFileIds = fileIds
+ fileIds = fileIds[:0]
+ }
+ deletionCount = len(toDeleteFileIds)
+ deleteResults, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
+ if err != nil {
+ glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ } else {
+ glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
+ }
+ if len(deleteResults) != deletionCount {
+ glog.V(0).Infof("delete %d fileIds actual %d", deletionCount, len(deleteResults))
+ }
}
+ })
+
+ if deletionCount == 0 {
+ time.Sleep(1123 * time.Millisecond)
}
}
}
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- f.fileIdDeletionChan <- chunk.GetFileIdString()
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
}
}
// DeleteFileByFileId direct delete by file id.
// Only used when the fileId is not being managed by snapshots.
func (f *Filer) DeleteFileByFileId(fileId string) {
- f.fileIdDeletionChan <- fileId
+ f.fileIdDeletionQueue.EnQueue(fileId)
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index c37381116..ecb488373 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -1,39 +1,157 @@
package filer2
import (
+ "context"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) {
- var key string
+ var fullpath string
if oldEntry != nil {
- key = string(oldEntry.FullPath)
+ fullpath = string(oldEntry.FullPath)
} else if newEntry != nil {
- key = string(newEntry.FullPath)
+ fullpath = string(newEntry.FullPath)
} else {
return
}
+ // println("fullpath:", fullpath)
+
+ if strings.HasPrefix(fullpath, SystemLogDir) {
+ return
+ }
+
+ newParentPath := ""
+ if newEntry != nil {
+ newParentPath, _ = newEntry.FullPath.DirAndName()
+ }
+ eventNotification := &filer_pb.EventNotification{
+ OldEntry: oldEntry.ToProtoEntry(),
+ NewEntry: newEntry.ToProtoEntry(),
+ DeleteChunks: deleteChunks,
+ NewParentPath: newParentPath,
+ }
+
if notification.Queue != nil {
+ glog.V(3).Infof("notifying entry update %v", fullpath)
+ notification.Queue.SendMessage(fullpath, eventNotification)
+ }
+
+ f.logMetaEvent(fullpath, eventNotification)
+
+}
+
+func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) {
+
+ dir, _ := util.FullPath(fullpath).DirAndName()
+
+ event := &filer_pb.SubscribeMetadataResponse{
+ Directory: dir,
+ EventNotification: eventNotification,
+ TsNs: time.Now().UnixNano(),
+ }
+ data, err := proto.Marshal(event)
+ if err != nil {
+ glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
+ return
+ }
+
+ f.MetaLogBuffer.AddToBuffer([]byte(dir), data)
- glog.V(3).Infof("notifying entry update %v", key)
+}
+
+func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
+
+ targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ // startTime.Second(), startTime.Nanosecond(),
+ )
+
+ if err := f.appendToFile(targetFile, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
+}
+
+func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error {
+
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
+ startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
- newParentPath := ""
- if newEntry != nil {
- newParentPath, _ = newEntry.FullPath.DirAndName()
+ sizeBuf := make([]byte, 4)
+ startTsNs := startTime.UnixNano()
+
+ dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366)
+ if listDayErr != nil {
+ return fmt.Errorf("fail to list log by day: %v", listDayErr)
+ }
+ for _, dayEntry := range dayEntries {
+ // println("checking day", dayEntry.FullPath)
+ hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60)
+ if listHourMinuteErr != nil {
+ return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
+ for _, hourMinuteEntry := range hourMinuteEntries {
+ // println("checking hh-mm", hourMinuteEntry.FullPath)
+ if dayEntry.Name() == startDate {
+ if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
+ continue
+ }
+ }
+ // println("processing", hourMinuteEntry.FullPath)
+ chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks)
+ if err := ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ chunkedFileReader.Close()
+ if err == io.EOF {
+ break
+ }
+ return fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
+ }
+ chunkedFileReader.Close()
+ }
+ }
- notification.Queue.SendMessage(
- key,
- &filer_pb.EventNotification{
- OldEntry: oldEntry.ToProtoEntry(),
- NewEntry: newEntry.ToProtoEntry(),
- DeleteChunks: deleteChunks,
- NewParentPath: newParentPath,
- },
- )
+ return nil
+}
+func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error {
+ for {
+ n, err := r.Read(sizeBuf)
+ if err != nil {
+ return err
+ }
+ if n != 4 {
+ return fmt.Errorf("size %d bytes, expected 4 bytes", n)
+ }
+ size := util.BytesToUint32(sizeBuf)
+ // println("entry size", size)
+ entryData := make([]byte, size)
+ n, err = r.Read(entryData)
+ if err != nil {
+ return err
+ }
+ if n != int(size) {
+ return fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
+ }
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ return err
+ }
+ if logEntry.TsNs <= ns {
+ return nil
+ }
+ // println("each log: ", logEntry.TsNs)
+ if err := eachLogEntryFn(logEntry); err != nil {
+ return err
+ }
}
}
diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go
new file mode 100644
index 000000000..af291058c
--- /dev/null
+++ b/weed/filer2/filer_notify_append.go
@@ -0,0 +1,73 @@
+package filer2
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (f *Filer) appendToFile(targetFile string, data []byte) error {
+
+ assignResult, uploadResult, err2 := f.assignAndUpload(data)
+ if err2 != nil {
+ return err2
+ }
+
+ // find out existing entry
+ fullpath := util.FullPath(targetFile)
+ entry, err := f.FindEntry(context.Background(), fullpath)
+ var offset int64 = 0
+ if err == filer_pb.ErrNotFound {
+ entry = &Entry{
+ FullPath: fullpath,
+ Attr: Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+ } else {
+ offset = int64(TotalSize(entry.Chunks))
+ }
+
+ // append to existing chunks
+ entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
+
+ // update the entry
+ err = f.CreateEntry(context.Background(), entry, false)
+
+ return err
+}
+
+func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+ // assign a volume location
+ assignRequest := &operation.VolumeAssignRequest{
+ Count: 1,
+ Collection: f.metaLogCollection,
+ Replication: f.metaLogReplication,
+ WritableVolumeCount: 1,
+ }
+ assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("AssignVolume: %v", err)
+ }
+ if assignResult.Error != "" {
+ return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
+ }
+
+ // upload data
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, uploadResult, nil
+}
diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go
index b74e2ad35..29170bfdf 100644
--- a/weed/filer2/filer_notify_test.go
+++ b/weed/filer2/filer_notify_test.go
@@ -5,13 +5,15 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/golang/protobuf/proto"
)
func TestProtoMarshalText(t *testing.T) {
oldEntry := &Entry{
- FullPath: FullPath("/this/path/to"),
+ FullPath: util.FullPath("/this/path/to"),
Attr: Attr{
Mtime: time.Now(),
Mode: 0644,
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index ae25534ed..f36c74f14 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -2,7 +2,6 @@ package filer2
import (
"context"
- "errors"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -18,17 +17,17 @@ type FilerStore interface {
InsertEntry(context.Context, *Entry) error
UpdateEntry(context.Context, *Entry) (err error)
// err == filer2.ErrNotFound if not found
- FindEntry(context.Context, FullPath) (entry *Entry, err error)
- DeleteEntry(context.Context, FullPath) (err error)
- DeleteFolderChildren(context.Context, FullPath) (err error)
- ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+ FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
+ DeleteEntry(context.Context, util.FullPath) (err error)
+ DeleteFolderChildren(context.Context, util.FullPath) (err error)
+ ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
RollbackTransaction(ctx context.Context) error
-}
-var ErrNotFound = errors.New("filer: no entry is found in filer store")
+ Shutdown()
+}
type FilerStoreWrapper struct {
actualStore FilerStore
@@ -73,7 +72,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
return fsw.actualStore.UpdateEntry(ctx, entry)
}
-func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
+func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
start := time.Now()
defer func() {
@@ -88,7 +87,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry
return
}
-func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
+func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
@@ -98,7 +97,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err
return fsw.actualStore.DeleteEntry(ctx, fp)
}
-func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now()
defer func() {
@@ -108,7 +107,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullP
return fsw.actualStore.DeleteFolderChildren(ctx, fp)
}
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
start := time.Now()
defer func() {
@@ -136,3 +135,7 @@ func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
return fsw.actualStore.RollbackTransaction(ctx)
}
+
+func (fsw *FilerStoreWrapper) Shutdown() {
+ fsw.actualStore.Shutdown()
+}
diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go
deleted file mode 100644
index 133069f93..000000000
--- a/weed/filer2/fullpath.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package filer2
-
-import (
- "path/filepath"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type FullPath string
-
-func NewFullPath(dir, name string) FullPath {
- return FullPath(dir).Child(name)
-}
-
-func (fp FullPath) DirAndName() (string, string) {
- dir, name := filepath.Split(string(fp))
- if dir == "/" {
- return dir, name
- }
- if len(dir) < 1 {
- return "/", ""
- }
- return dir[:len(dir)-1], name
-}
-
-func (fp FullPath) Name() string {
- _, name := filepath.Split(string(fp))
- return name
-}
-
-func (fp FullPath) Child(name string) FullPath {
- dir := string(fp)
- if strings.HasSuffix(dir, "/") {
- return FullPath(dir + name)
- }
- return FullPath(dir + "/" + name)
-}
-
-func (fp FullPath) AsInode() uint64 {
- return uint64(util.HashStringToLong(string(fp)))
-}
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index 44e6ac0eb..31919ca49 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -6,11 +6,13 @@ import (
"fmt"
"github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
@@ -48,8 +50,13 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
}
if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
- glog.Infof("filer store open dir %s: %v", dir, err)
- return
+ if errors.IsCorrupted(err) {
+ store.db, err = leveldb.RecoverFile(dir, opts)
+ }
+ if err != nil {
+ glog.Infof("filer store open dir %s: %v", dir, err)
+ return
+ }
}
return
}
@@ -88,13 +95,13 @@ func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry)
return store.InsertEntry(ctx, entry)
}
-func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
key := genKey(fullpath.DirAndName())
data, err := store.db.Get(key, nil)
if err == leveldb.ErrNotFound {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
@@ -113,7 +120,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPa
return entry, nil
}
-func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
key := genKey(fullpath.DirAndName())
err = store.db.Delete(key, nil)
@@ -124,7 +131,7 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full
return nil
}
-func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
batch := new(leveldb.Batch)
@@ -152,7 +159,7 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath fi
return nil
}
-func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
@@ -175,7 +182,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath fi
break
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
err = decodeErr
@@ -196,7 +203,7 @@ func genKey(dirPath, fileName string) (key []byte) {
return key
}
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
keyPrefix = []byte(string(fullpath))
keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
if len(startFileName) > 0 {
@@ -215,3 +222,7 @@ func getNameFromKey(key []byte) string {
return string(key[sepIndex+1:])
}
+
+func (store *LevelDBStore) Shutdown() {
+ store.db.Close()
+}
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 983e1cbe9..1daa47c97 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -2,14 +2,16 @@ package leveldb
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"io/ioutil"
"os"
"testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) {
filer.SetStore(store)
filer.DisableDirectoryCache()
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
ctx := context.Background()
@@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -64,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
index 358d4d92a..c907e8746 100644
--- a/weed/filer2/leveldb2/leveldb2_store.go
+++ b/weed/filer2/leveldb2/leveldb2_store.go
@@ -9,11 +9,13 @@ import (
"os"
"github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
@@ -51,9 +53,12 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
dbFolder := fmt.Sprintf("%s/%02d", dir, d)
os.MkdirAll(dbFolder, 0755)
db, dbErr := leveldb.OpenFile(dbFolder, opts)
+ if errors.IsCorrupted(dbErr) {
+ db, dbErr = leveldb.RecoverFile(dbFolder, opts)
+ }
if dbErr != nil {
glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
- return
+ return dbErr
}
store.dbs = append(store.dbs, db)
}
@@ -97,14 +102,14 @@ func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry
return store.InsertEntry(ctx, entry)
}
-func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
dir, name := fullpath.DirAndName()
key, partitionId := genKey(dir, name, store.dbCount)
data, err := store.dbs[partitionId].Get(key, nil)
if err == leveldb.ErrNotFound {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
@@ -123,7 +128,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullP
return entry, nil
}
-func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
dir, name := fullpath.DirAndName()
key, partitionId := genKey(dir, name, store.dbCount)
@@ -135,7 +140,7 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful
return nil
}
-func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
batch := new(leveldb.Batch)
@@ -163,7 +168,7 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath f
return nil
}
-func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
@@ -187,7 +192,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath f
break
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
@@ -210,7 +215,7 @@ func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int)
return key, partitionId
}
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount)
if len(startFileName) > 0 {
keyPrefix = append(keyPrefix, []byte(startFileName)...)
@@ -235,3 +240,9 @@ func hashToBytes(dir string, dbCount int) ([]byte, int) {
return b, int(x) % dbCount
}
+
+func (store *LevelDB2Store) Shutdown() {
+ for d := 0; d < store.dbCount; d++ {
+ store.dbs[d].Close()
+ }
+}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index 58637b7b6..9ad168233 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -2,14 +2,16 @@ package leveldb
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"io/ioutil"
"os"
"testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
@@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) {
filer.SetStore(store)
filer.DisableDirectoryCache()
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
ctx := context.Background()
@@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -64,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer2/mongodb/mongodb_store.go b/weed/filer2/mongodb/mongodb_store.go
new file mode 100644
index 000000000..375a457a4
--- /dev/null
+++ b/weed/filer2/mongodb/mongodb_store.go
@@ -0,0 +1,210 @@
+package mongodb
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "go.mongodb.org/mongo-driver/x/bsonx"
+ "time"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &MongodbStore{})
+}
+
+type MongodbStore struct {
+ connect *mongo.Client
+ database string
+ collectionName string
+}
+
+type Model struct {
+ Directory string `bson:"directory"`
+ Name string `bson:"name"`
+ Meta []byte `bson:"meta"`
+}
+
+func (store *MongodbStore) GetName() string {
+ return "mongodb"
+}
+
+func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ store.database = configuration.GetString(prefix + "database")
+ store.collectionName = "filemeta"
+ poolSize := configuration.GetInt(prefix + "option_pool_size")
+ return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize))
+}
+
+func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+ opts := options.Client().ApplyURI(uri)
+
+ if poolSize > 0 {
+ opts.SetMaxPoolSize(poolSize)
+ }
+
+ client, err := mongo.Connect(ctx, opts)
+ if err != nil {
+ return err
+ }
+
+ c := client.Database(store.database).Collection(store.collectionName)
+ err = store.indexUnique(c)
+ store.connect = client
+ return err
+}
+
+func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error {
+ _, err := c.Indexes().CreateOne(context.Background(), index, opts)
+ return err
+}
+
+func (store *MongodbStore) indexUnique(c *mongo.Collection) error {
+ opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
+
+ unique := new(bool)
+ *unique = true
+
+ index := mongo.IndexModel{
+ Keys: bsonx.Doc{{Key: "directory", Value: bsonx.Int32(1)}, {Key: "name", Value: bsonx.Int32(1)}},
+ Options: &options.IndexOptions{
+ Unique: unique,
+ },
+ }
+
+ return store.createIndex(c, index, opts)
+}
+
+func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+
+func (store *MongodbStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *MongodbStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ c := store.connect.Database(store.database).Collection(store.collectionName)
+
+ _, err = c.InsertOne(ctx, Model{
+ Directory: dir,
+ Name: name,
+ Meta: meta,
+ })
+
+ return nil
+}
+
+func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
+
+ dir, name := fullpath.DirAndName()
+ var data Model
+
+ var where = bson.M{"directory": dir, "name": name}
+ err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
+ if err != mongo.ErrNoDocuments && err != nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if len(data.Meta) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+
+ err = entry.DecodeAttributesAndChunks(data.Meta)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
+
+ dir, name := fullpath.DirAndName()
+
+ where := bson.M{"directory": dir, "name": name}
+ _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+
+ where := bson.M{"directory": fullpath}
+ _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
+
+ var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}}
+ if inclusive {
+ where["name"] = bson.M{
+ "$gte": startFileName,
+ }
+ }
+ optLimit := int64(limit)
+ opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}}
+ cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts)
+ for cur.Next(ctx) {
+ var data Model
+ err := cur.Decode(&data)
+ if err != nil && err != mongo.ErrNoDocuments {
+ return nil, err
+ }
+
+ entry := &filer2.Entry{
+ FullPath: util.NewFullPath(string(fullpath), data.Name),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+
+ entries = append(entries, entry)
+ }
+
+ if err := cur.Close(ctx); err != nil {
+ glog.V(0).Infof("list iterator close: %v", err)
+ }
+
+ return entries, err
+}
+
+func (store *MongodbStore) Shutdown() {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+ store.connect.Disconnect(ctx)
+}
diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go
index 27a0c2513..51c069aae 100644
--- a/weed/filer2/postgres/postgres_store.go
+++ b/weed/filer2/postgres/postgres_store.go
@@ -11,7 +11,7 @@ import (
)
const (
- CONNECTION_URL_PATTERN = "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30"
+ CONNECTION_URL_PATTERN = "host=%s port=%d user=%s sslmode=%s connect_timeout=30"
)
func init() {
@@ -49,7 +49,13 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
- sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, password, database, sslmode)
+ sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode)
+ if password != "" {
+ sqlUrl += " password=" + password
+ }
+ if database != "" {
+ sqlUrl += " dbname=" + database
+ }
var dbErr error
store.DB, dbErr = sql.Open("postgres", sqlUrl)
if dbErr != nil {
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
new file mode 100644
index 000000000..53fff7672
--- /dev/null
+++ b/weed/filer2/reader_at.go
@@ -0,0 +1,156 @@
+package filer2
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+type ChunkReadAt struct {
+ masterClient *wdclient.MasterClient
+ chunkViews []*ChunkView
+ buffer []byte
+ bufferOffset int64
+ lookupFileId func(fileId string) (targetUrl string, err error)
+ readerLock sync.Mutex
+
+ chunkCache *chunk_cache.ChunkCache
+}
+
+// var _ = io.ReaderAt(&ChunkReadAt{})
+
+type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
+
+func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
+ return func(fileId string) (targetUrl string, err error) {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ vid := VolumeId(fileId)
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return err
+ }
+
+ locations := resp.LocationsMap[vid]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", fileId)
+ return fmt.Errorf("failed to locate %s", fileId)
+ }
+
+ volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
+
+ targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
+
+ return nil
+ })
+ return
+ }
+}
+
+func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
+
+ return &ChunkReadAt{
+ chunkViews: chunkViews,
+ lookupFileId: LookupFn(filerClient),
+ bufferOffset: -1,
+ chunkCache: chunkCache,
+ }
+}
+
+func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
+
+ c.readerLock.Lock()
+ defer c.readerLock.Unlock()
+
+ for n < len(p) && err == nil {
+ readCount, readErr := c.doReadAt(p[n:], offset+int64(n))
+ n += readCount
+ err = readErr
+ if readCount == 0 {
+ return n, nil
+ }
+ }
+ return
+}
+
+func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
+
+ var found bool
+ for _, chunk := range c.chunkViews {
+ if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
+ found = true
+ if c.bufferOffset != chunk.LogicOffset {
+ c.buffer, err = c.fetchChunkData(chunk)
+ c.bufferOffset = chunk.LogicOffset
+ }
+ break
+ }
+ }
+ if !found {
+ return 0, io.EOF
+ }
+
+ n = copy(p, c.buffer[offset-c.bufferOffset:])
+
+ // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
+
+ return
+
+}
+
+func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err error) {
+
+ // fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+
+ hasDataInCache := false
+ chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ if chunkData != nil {
+ glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+ hasDataInCache = true
+ } else {
+ chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if int64(len(chunkData)) < chunkView.Offset+int64(chunkView.Size) {
+ return nil, fmt.Errorf("unexpected larger chunkView [%d,%d) than chunk %d", chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData))
+ }
+
+ data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)]
+
+ if !hasDataInCache {
+ c.chunkCache.SetChunk(chunkView.FileId, chunkData)
+ }
+
+ return data, nil
+}
+
+func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+
+ urlString, err := c.lookupFileId(fileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+ var buffer bytes.Buffer
+ err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+
+ return buffer.Bytes(), nil
+}
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index 62257e91e..e5b9e8840 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -3,12 +3,16 @@ package redis
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/go-redis/redis"
"sort"
"strings"
"time"
+
+ "github.com/go-redis/redis"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -58,11 +62,11 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2
return store.InsertEntry(ctx, entry)
}
-func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result()
if err == redis.Nil {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
if err != nil {
@@ -80,7 +84,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2
return entry, nil
}
-func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result()
@@ -99,7 +103,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file
return nil
}
-func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
if err != nil {
@@ -107,7 +111,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
}
for _, fileName := range members {
- path := filer2.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(string(path)).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
@@ -117,10 +121,11 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
return nil
}
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ dirListKey := genDirectoryListKey(string(fullpath))
+ members, err := store.Client.SMembers(dirListKey).Result()
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
@@ -154,11 +159,18 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
// fetch entry meta
for _, fileName := range members {
- path := filer2.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(fullpath), fileName)
entry, err := store.FindEntry(ctx, path)
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
} else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(string(path)).Result()
+ store.Client.SRem(dirListKey, fileName).Result()
+ continue
+ }
+ }
entries = append(entries, entry)
}
}
@@ -169,3 +181,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
}
+
+func (store *UniversalRedisStore) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer2/redis2/redis_cluster_store.go b/weed/filer2/redis2/redis_cluster_store.go
new file mode 100644
index 000000000..b252eabab
--- /dev/null
+++ b/weed/filer2/redis2/redis_cluster_store.go
@@ -0,0 +1,42 @@
+package redis2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &RedisCluster2Store{})
+}
+
+type RedisCluster2Store struct {
+ UniversalRedis2Store
+}
+
+func (store *RedisCluster2Store) GetName() string {
+ return "redis_cluster2"
+}
+
+func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", true)
+ configuration.SetDefault(prefix+"routeByLatency", true)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ )
+}
+
+func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ return
+}
diff --git a/weed/filer2/redis2/redis_store.go b/weed/filer2/redis2/redis_store.go
new file mode 100644
index 000000000..1e2a20043
--- /dev/null
+++ b/weed/filer2/redis2/redis_store.go
@@ -0,0 +1,36 @@
+package redis2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &Redis2Store{})
+}
+
+type Redis2Store struct {
+ UniversalRedis2Store
+}
+
+func (store *Redis2Store) GetName() string {
+ return "redis2"
+}
+
+func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ return
+}
diff --git a/weed/filer2/redis2/universal_redis_store.go b/weed/filer2/redis2/universal_redis_store.go
new file mode 100644
index 000000000..420336b46
--- /dev/null
+++ b/weed/filer2/redis2/universal_redis_store.go
@@ -0,0 +1,162 @@
+package redis2
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-redis/redis"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedis2Store struct {
+ Client redis.UniversalClient
+}
+
+func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+ if name != "" {
+ if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
+ return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
+
+ data, err := store.Client.Get(string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks([]byte(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ _, err = store.Client.Del(string(fullpath)).Result()
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ dir, name := fullpath.DirAndName()
+ if name != "" {
+ _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
+ if err != nil {
+ return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ }
+
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ _, err = store.Client.Del(string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer2.Entry, err error) {
+
+ dirListKey := genDirectoryListKey(string(fullpath))
+ start := int64(0)
+ if startFileName != "" {
+ start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
+ if !inclusive {
+ start++
+ }
+ }
+ members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+
+ // fetch entry meta
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(string(path)).Result()
+ store.Client.ZRem(dirListKey, fileName).Result()
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ }
+
+ return entries, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedis2Store) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index 01b87cad1..033a8dd13 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -1,7 +1,10 @@
package filer2
import (
+ "bytes"
"io"
+ "math"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -9,7 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
-func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error {
+func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
chunkViews := ViewFromChunks(chunks, offset, size)
@@ -26,8 +29,9 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
}
for _, chunkView := range chunkViews {
+
urlString := fileId2Url[chunkView.FileId]
- _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
w.Write(data)
})
if err != nil {
@@ -39,3 +43,157 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
return nil
}
+
+// ---------------- ReadAllReader ----------------------------------
+
+func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
+
+ buffer := bytes.Buffer{}
+
+ chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
+
+ lookupFileId := func(fileId string) (targetUrl string, err error) {
+ return masterClient.LookupFileId(fileId)
+ }
+
+ for _, chunkView := range chunkViews {
+ urlString, err := lookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return nil, err
+ }
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return nil, err
+ }
+ }
+ return buffer.Bytes(), nil
+}
+
+// ---------------- ChunkStreamReader ----------------------------------
+type ChunkStreamReader struct {
+ chunkViews []*ChunkView
+ logicOffset int64
+ buffer []byte
+ bufferOffset int64
+ bufferPos int
+ chunkIndex int
+ lookupFileId LookupFileIdFunctionType
+}
+
+var _ = io.ReadSeeker(&ChunkStreamReader{})
+
+func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
+
+ return &ChunkStreamReader{
+ chunkViews: chunkViews,
+ lookupFileId: func(fileId string) (targetUrl string, err error) {
+ return masterClient.LookupFileId(fileId)
+ },
+ }
+}
+
+func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
+
+ return &ChunkStreamReader{
+ chunkViews: chunkViews,
+ lookupFileId: LookupFn(filerClient),
+ }
+}
+
+func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
+ for n < len(p) {
+ if c.isBufferEmpty() {
+ if c.chunkIndex >= len(c.chunkViews) {
+ return n, io.EOF
+ }
+ chunkView := c.chunkViews[c.chunkIndex]
+ c.fetchChunkToBuffer(chunkView)
+ c.chunkIndex++
+ }
+ t := copy(p[n:], c.buffer[c.bufferPos:])
+ c.bufferPos += t
+ n += t
+ }
+ return
+}
+
+func (c *ChunkStreamReader) isBufferEmpty() bool {
+ return len(c.buffer) <= c.bufferPos
+}
+
+func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
+
+ var totalSize int64
+ for _, chunk := range c.chunkViews {
+ totalSize += int64(chunk.Size)
+ }
+
+ var err error
+ switch whence {
+ case io.SeekStart:
+ case io.SeekCurrent:
+ offset += c.bufferOffset + int64(c.bufferPos)
+ case io.SeekEnd:
+ offset = totalSize + offset
+ }
+ if offset > totalSize {
+ err = io.ErrUnexpectedEOF
+ }
+
+ for i, chunk := range c.chunkViews {
+ if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
+ if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
+ c.fetchChunkToBuffer(chunk)
+ c.chunkIndex = i + 1
+ break
+ }
+ }
+ }
+ c.bufferPos = int(offset - c.bufferOffset)
+
+ return offset, err
+
+}
+
+func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
+ urlString, err := c.lookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ var buffer bytes.Buffer
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ c.buffer = buffer.Bytes()
+ c.bufferPos = 0
+ c.bufferOffset = chunkView.LogicOffset
+
+ // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+
+ return nil
+}
+
+func (c *ChunkStreamReader) Close() {
+ // TODO try to release and reuse buffer
+}
+
+func VolumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}
diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go
deleted file mode 100644
index 24e05e3ad..000000000
--- a/weed/filer2/tikv/tikv_store.go
+++ /dev/null
@@ -1,251 +0,0 @@
-// +build !386
-// +build !arm
-
-package tikv
-
-import (
- "bytes"
- "context"
- "crypto/md5"
- "fmt"
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-
- "github.com/pingcap/tidb/kv"
- "github.com/pingcap/tidb/store/tikv"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &TikvStore{})
-}
-
-type TikvStore struct {
- store kv.Storage
-}
-
-func (store *TikvStore) GetName() string {
- return "tikv"
-}
-
-func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
- pdAddr := configuration.GetString(prefix + "pdAddress")
- return store.initialize(pdAddr)
-}
-
-func (store *TikvStore) initialize(pdAddr string) (err error) {
- glog.Infof("filer store tikv pd address: %s", pdAddr)
-
- driver := tikv.Driver{}
-
- store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr))
-
- if err != nil {
- return fmt.Errorf("open tikv %s : %v", pdAddr, err)
- }
-
- return
-}
-
-func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- tx, err := store.store.Begin()
- if err != nil {
- return ctx, err
- }
- return context.WithValue(ctx, "tx", tx), nil
-}
-func (store *TikvStore) CommitTransaction(ctx context.Context) error {
- tx, ok := ctx.Value("tx").(kv.Transaction)
- if ok {
- return tx.Commit(ctx)
- }
- return nil
-}
-func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
- tx, ok := ctx.Value("tx").(kv.Transaction)
- if ok {
- return tx.Rollback()
- }
- return nil
-}
-
-func (store *TikvStore) getTx(ctx context.Context) kv.Transaction {
- if tx, ok := ctx.Value("tx").(kv.Transaction); ok {
- return tx
- }
- return nil
-}
-
-func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- dir, name := entry.DirAndName()
- key := genKey(dir, name)
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- err = store.getTx(ctx).Set(key, value)
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
-
- return nil
-}
-
-func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- dir, name := fullpath.DirAndName()
- key := genKey(dir, name)
-
- data, err := store.getTx(ctx).Get(ctx, key)
-
- if err == kv.ErrNotExist {
- return nil, filer2.ErrNotFound
- }
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
-
- return entry, nil
-}
-
-func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- dir, name := fullpath.DirAndName()
- key := genKey(dir, name)
-
- err = store.getTx(ctx).Delete(key)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- tx := store.getTx(ctx)
-
- iter, err := tx.Iter(directoryPrefix, nil)
- if err != nil {
- return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
- }
- defer iter.Close()
- for iter.Valid() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- iter.Next()
- continue
- }
-
- if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- iter.Next()
- }
-
- return nil
-}
-
-func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
- lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName)
-
- iter, err := store.getTx(ctx).Iter(lastFileStart, nil)
- if err != nil {
- return nil, fmt.Errorf("list %s: %v", fullpath, err)
- }
- defer iter.Close()
- for iter.Valid() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- iter.Next()
- continue
- }
- if fileName == startFileName && !inclusive {
- iter.Next()
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
- }
-
- // println("list", entry.FullPath, "chunks", len(entry.Chunks))
-
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- iter.Next()
- }
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string) (key []byte) {
- key = hashToBytes(dirPath)
- key = append(key, []byte(fileName)...)
- return key
-}
-
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
- keyPrefix = hashToBytes(string(fullpath))
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix
-}
-
-func getNameFromKey(key []byte) string {
-
- return string(key[md5.Size:])
-
-}
-
-// hash directory
-func hashToBytes(dir string) []byte {
- h := md5.New()
- io.WriteString(h, dir)
-
- b := h.Sum(nil)
-
- return b
-}
diff --git a/weed/filer2/tikv/tikv_store_unsupported.go b/weed/filer2/tikv/tikv_store_unsupported.go
deleted file mode 100644
index daf29612e..000000000
--- a/weed/filer2/tikv/tikv_store_unsupported.go
+++ /dev/null
@@ -1,65 +0,0 @@
-// +build 386 arm
-
-package tikv
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &TikvStore{})
-}
-
-type TikvStore struct {
-}
-
-func (store *TikvStore) GetName() string {
- return "tikv"
-}
-
-func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) initialize(pdAddr string) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return nil, fmt.Errorf("not implemented for 32 bit computers")
-}
-func (store *TikvStore) CommitTransaction(ctx context.Context) error {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- return nil, fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
- return nil, fmt.Errorf("not implemented for 32 bit computers")
-}
diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go
new file mode 100644
index 000000000..9c6e5c88d
--- /dev/null
+++ b/weed/filer2/topics.go
@@ -0,0 +1,6 @@
+package filer2
+
+const (
+ TopicsDir = "/topics"
+ SystemLogDir = TopicsDir + "/.system/log"
+)