aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorHongyanShen <763987993@qq.com>2020-03-11 12:55:24 +0800
committerGitHub <noreply@github.com>2020-03-11 12:55:24 +0800
commit03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch)
treeed8833386a712c850dcef0815509774681a6ab56 /weed/filer2
parent0fca1ae776783b37481549df40f477b7d9248d3c (diff)
parent60f5f05c78a2918d5219c925cea5847759281a2c (diff)
downloadseaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz
seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go44
-rw-r--r--weed/filer2/abstract_sql/hashing.go32
-rw-r--r--weed/filer2/cassandra/cassandra_store.go26
-rw-r--r--weed/filer2/configuration.go3
-rw-r--r--weed/filer2/entry.go2
-rw-r--r--weed/filer2/entry_codec.go27
-rw-r--r--weed/filer2/etcd/etcd_store.go22
-rw-r--r--weed/filer2/filechunks.go36
-rw-r--r--weed/filer2/filechunks_test.go36
-rw-r--r--weed/filer2/filer.go159
-rw-r--r--weed/filer2/filer_buckets.go113
-rw-r--r--weed/filer2/filer_client_util.go94
-rw-r--r--weed/filer2/filer_delete_entry.go125
-rw-r--r--weed/filer2/filer_deletion.go40
-rw-r--r--weed/filer2/filerstore.go20
-rw-r--r--weed/filer2/fullpath.go6
-rw-r--r--weed/filer2/leveldb/leveldb_store.go42
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go6
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go42
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go6
-rw-r--r--weed/filer2/memdb/memdb_store.go132
-rw-r--r--weed/filer2/memdb/memdb_store_test.go149
-rw-r--r--weed/filer2/mysql/mysql_store.go25
-rw-r--r--weed/filer2/postgres/postgres_store.go19
-rw-r--r--weed/filer2/redis/redis_cluster_store.go20
-rw-r--r--weed/filer2/redis/redis_store.go8
-rw-r--r--weed/filer2/redis/universal_redis_store.go39
-rw-r--r--weed/filer2/stream.go3
-rw-r--r--weed/filer2/tikv/tikv_store.go40
-rw-r--r--weed/filer2/tikv/tikv_store_unsupported.go8
30 files changed, 752 insertions, 572 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index 3e8554957..864c858d3 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -7,16 +7,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlListExclusive string
- SqlListInclusive string
+ DB *sql.DB
+ SqlInsert string
+ SqlUpdate string
+ SqlFind string
+ SqlDelete string
+ SqlDeleteFolderChildren string
+ SqlListExclusive string
+ SqlListInclusive string
}
type TxOrDB interface {
@@ -64,7 +67,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.En
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
if err != nil {
return fmt.Errorf("insert %s: %s", entry.FullPath, err)
}
@@ -84,7 +87,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("update %s: %s", entry.FullPath, err)
}
@@ -99,10 +102,10 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
dir, name := fullpath.DirAndName()
- row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir)
+ row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
var data []byte
if err := row.Scan(&data); err != nil {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
entry := &filer2.Entry{
@@ -119,7 +122,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
dir, name := fullpath.DirAndName()
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("delete %s: %s", fullpath, err)
}
@@ -132,6 +135,21 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
return nil
}
+func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive
@@ -139,7 +157,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
sqlText = store.SqlListInclusive
}
- rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
+ rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), limit)
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
diff --git a/weed/filer2/abstract_sql/hashing.go b/weed/filer2/abstract_sql/hashing.go
deleted file mode 100644
index 5c982c537..000000000
--- a/weed/filer2/abstract_sql/hashing.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package abstract_sql
-
-import (
- "crypto/md5"
- "io"
-)
-
-// returns a 64 bit big int
-func hashToLong(dir string) (v int64) {
- h := md5.New()
- io.WriteString(h, dir)
-
- b := h.Sum(nil)
-
- v += int64(b[0])
- v <<= 8
- v += int64(b[1])
- v <<= 8
- v += int64(b[2])
- v <<= 8
- v += int64(b[3])
- v <<= 8
- v += int64(b[4])
- v <<= 8
- v += int64(b[5])
- v <<= 8
- v += int64(b[6])
- v <<= 8
- v += int64(b[7])
-
- return
-}
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index 466be5bf3..6f25fffec 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -3,10 +3,13 @@ package cassandra
import (
"context"
"fmt"
+
+ "github.com/gocql/gocql"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gocql/gocql"
)
func init() {
@@ -22,10 +25,10 @@ func (store *CassandraStore) GetName() string {
return "cassandra"
}
-func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) {
+func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
- configuration.GetString("keyspace"),
- configuration.GetStringSlice("hosts"),
+ configuration.GetString(prefix+"keyspace"),
+ configuration.GetStringSlice(prefix+"hosts"),
)
}
@@ -80,12 +83,12 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
dir, name).Consistency(gocql.One).Scan(&data); err != nil {
if err != gocql.ErrNotFound {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
}
if len(data) == 0 {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
entry = &filer2.Entry{
@@ -112,6 +115,17 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu
return nil
}
+func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=?",
+ fullpath).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go
index 7b05b53dc..a174117ea 100644
--- a/weed/filer2/configuration.go
+++ b/weed/filer2/configuration.go
@@ -17,8 +17,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
for _, store := range Stores {
if config.GetBool(store.GetName() + ".enabled") {
- viperSub := config.Sub(store.GetName())
- if err := store.Initialize(viperSub); err != nil {
+ if err := store.Initialize(config, store.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v",
store.GetName(), err)
}
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index 3f8a19114..c901927bb 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -30,6 +30,7 @@ type Entry struct {
FullPath
Attr
+ Extended map[string][]byte
// the following is for files
Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`
@@ -56,6 +57,7 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
IsDirectory: entry.IsDirectory(),
Attributes: EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
}
}
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
index cf4627b74..3a2dc6134 100644
--- a/weed/filer2/entry_codec.go
+++ b/weed/filer2/entry_codec.go
@@ -1,18 +1,21 @@
package filer2
import (
+ "bytes"
+ "fmt"
"os"
"time"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
message := &filer_pb.Entry{
Attributes: EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
}
return proto.Marshal(message)
}
@@ -27,6 +30,8 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
entry.Attr = PbToEntryAttribute(message.Attributes)
+ entry.Extended = message.Extended
+
entry.Chunks = message.Chunks
return nil
@@ -84,6 +89,10 @@ func EqualEntry(a, b *Entry) bool {
return false
}
+ if !eq(a.Extended, b.Extended) {
+ return false
+ }
+
for i := 0; i < len(a.Chunks); i++ {
if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
return false
@@ -91,3 +100,17 @@ func EqualEntry(a, b *Entry) bool {
}
return true
}
+
+func eq(a, b map[string][]byte) bool {
+ if len(a) != len(b) {
+ return false
+ }
+
+ for k, v := range a {
+ if w, ok := b[k]; !ok || !bytes.Equal(v, w) {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
index 1b0f928d0..83a6ddc5d 100644
--- a/weed/filer2/etcd/etcd_store.go
+++ b/weed/filer2/etcd/etcd_store.go
@@ -6,10 +6,12 @@ import (
"strings"
"time"
+ "go.etcd.io/etcd/clientv3"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
- "go.etcd.io/etcd/clientv3"
)
const (
@@ -28,13 +30,13 @@ func (store *EtcdStore) GetName() string {
return "etcd"
}
-func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) {
- servers := configuration.GetString("servers")
+func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ servers := configuration.GetString(prefix + "servers")
if servers == "" {
servers = "localhost:2379"
}
- timeout := configuration.GetString("timeout")
+ timeout := configuration.GetString(prefix + "timeout")
if timeout == "" {
timeout = "3s"
}
@@ -99,7 +101,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath)
}
if len(resp.Kvs) == 0 {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
entry = &filer2.Entry{
@@ -123,6 +125,16 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
return nil
}
+func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
+ return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *EtcdStore) ListDirectoryEntries(
ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index b5876df82..711488df1 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -71,6 +71,8 @@ type ChunkView struct {
Size uint64
LogicOffset int64
IsFullChunk bool
+ CipherKey []byte
+ isGzipped bool
}
func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) {
@@ -86,6 +88,7 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
stop := offset + int64(size)
for _, chunk := range visibles {
+
if chunk.start <= offset && offset < chunk.stop && offset < stop {
isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
views = append(views, &ChunkView{
@@ -94,6 +97,8 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
Size: uint64(min(chunk.stop, stop) - offset),
LogicOffset: offset,
IsFullChunk: isFullChunk,
+ CipherKey: chunk.cipherKey,
+ isGzipped: chunk.isGzipped,
})
offset = min(chunk.stop, stop)
}
@@ -120,13 +125,7 @@ var bufPool = sync.Pool{
func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
- newV := newVisibleInterval(
- chunk.Offset,
- chunk.Offset+int64(chunk.Size),
- chunk.GetFileIdString(),
- chunk.Mtime,
- true,
- )
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped)
length := len(visibles)
if length == 0 {
@@ -140,23 +139,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
logPrintf(" before", visibles)
for _, v := range visibles {
if v.start < chunk.Offset && chunk.Offset < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(
- v.start,
- chunk.Offset,
- v.fileId,
- v.modifiedTime,
- false,
- ))
+ newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
}
chunkStop := chunk.Offset + int64(chunk.Size)
if v.start < chunkStop && chunkStop < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false,
- ))
+ newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
}
if chunkStop <= v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
@@ -187,6 +174,7 @@ func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []Vi
var newVisibles []VisibleInterval
for _, chunk := range chunks {
+
newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk)
t := visibles[:0]
visibles = newVisibles
@@ -208,15 +196,19 @@ type VisibleInterval struct {
modifiedTime int64
fileId string
isFullChunk bool
+ cipherKey []byte
+ isGzipped bool
}
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) VisibleInterval {
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval {
return VisibleInterval{
start: start,
stop: stop,
fileId: fileId,
modifiedTime: modifiedTime,
isFullChunk: isFullChunk,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
}
}
diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go
index e75e60753..bb4a6c74d 100644
--- a/weed/filer2/filechunks_test.go
+++ b/weed/filer2/filechunks_test.go
@@ -331,6 +331,42 @@ func TestChunksReading(t *testing.T) {
{Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
},
},
+ // case 8: edge cases
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 90, Size: 200, FileId: "asdf", Mtime: 134},
+ {Offset: 190, Size: 300, FileId: "fsad", Mtime: 353},
+ },
+ Offset: 0,
+ Size: 300,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0},
+ {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90},
+ {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190},
+ },
+ },
+ // case 9: edge cases
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 43175947, FileId: "2,111fc2cbfac1", Mtime: 1},
+ {Offset: 43175936, Size: 52981771 - 43175936, FileId: "2,112a36ea7f85", Mtime: 2},
+ {Offset: 52981760, Size: 72564747 - 52981760, FileId: "4,112d5f31c5e7", Mtime: 3},
+ {Offset: 72564736, Size: 133255179 - 72564736, FileId: "1,113245f0cdb6", Mtime: 4},
+ {Offset: 133255168, Size: 137269259 - 133255168, FileId: "3,1141a70733b5", Mtime: 5},
+ {Offset: 137269248, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", Mtime: 6},
+ },
+ Offset: 0,
+ Size: 153578836,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0},
+ {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936},
+ {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760},
+ {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736},
+ {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168},
+ {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248},
+ },
+ },
}
for i, testcase := range testcases {
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 672295dea..d3343f610 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -3,37 +3,46 @@ package filer2
import (
"context"
"fmt"
- "google.golang.org/grpc"
- "math"
"os"
"path/filepath"
"strings"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/karlseguin/ccache"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/karlseguin/ccache"
)
+const PaginationSize = 1024 * 256
+
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
)
type Filer struct {
- store *FilerStoreWrapper
- directoryCache *ccache.Cache
- MasterClient *wdclient.MasterClient
- fileIdDeletionChan chan string
- GrpcDialOption grpc.DialOption
+ store *FilerStoreWrapper
+ directoryCache *ccache.Cache
+ MasterClient *wdclient.MasterClient
+ fileIdDeletionQueue *util.UnboundedQueue
+ GrpcDialOption grpc.DialOption
+ DirBucketsPath string
+ DirQueuesPath string
+ buckets *FilerBuckets
+ Cipher bool
}
-func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
+func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32) *Filer {
f := &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
- fileIdDeletionChan: make(chan string, 4096),
- GrpcDialOption: grpcDialOption,
+ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters),
+ fileIdDeletionQueue: util.NewUnboundedQueue(),
+ GrpcDialOption: grpcDialOption,
}
go f.loopProcessingDeletion()
@@ -69,7 +78,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.store.RollbackTransaction(ctx)
}
-func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
+func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) error {
if string(entry.FullPath) == "/" {
return nil
@@ -93,7 +102,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
glog.V(4).Infof("find uncached directory: %s", dirPath)
dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
} else {
- glog.V(4).Infof("found cached directory: %s", dirPath)
+ // glog.V(4).Infof("found cached directory: %s", dirPath)
}
// no such existing directory
@@ -105,25 +114,30 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
dirEntry = &Entry{
FullPath: FullPath(dirPath),
Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | 0770,
- Uid: entry.Uid,
- Gid: entry.Gid,
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | 0770,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
},
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound {
+ if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
+ f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(nil, dirEntry, false)
}
} else if !dirEntry.IsDirectory() {
+ glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
return fmt.Errorf("%s is a file", dirPath)
}
@@ -138,6 +152,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
}
if lastDirectoryEntry == nil {
+ glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
return fmt.Errorf("parent folder not found: %v", entry.FullPath)
}
@@ -151,22 +166,30 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
+ glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl)
if oldEntry == nil {
if err := f.store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
} else {
+ if o_excl {
+ glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
+ return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
+ }
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
glog.Errorf("update entry %s: %v", entry.FullPath, err)
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
}
}
+ f.maybeAddBucket(entry)
f.NotifyUpdateEvent(oldEntry, entry, true)
f.deleteChunksIfNotNew(oldEntry, entry)
+ glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
+
return nil
}
@@ -200,75 +223,51 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
},
}, nil
}
- return f.store.FindEntry(ctx, p)
-}
-
-func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
- entry, err := f.FindEntry(ctx, p)
- if err != nil {
- return err
- }
-
- if entry.IsDirectory() {
- limit := int(1)
- if isRecursive {
- limit = math.MaxInt32
- }
- lastFileName := ""
- includeLastFile := false
- for limit > 0 {
- entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024)
- if err != nil {
- glog.Errorf("list folder %s: %v", p, err)
- return fmt.Errorf("list folder %s: %v", p, err)
- }
-
- if len(entries) == 0 {
- break
- }
-
- if isRecursive {
- for _, sub := range entries {
- lastFileName = sub.Name()
- err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
- if err != nil && !ignoreRecursiveError {
- return err
- }
- limit--
- if limit <= 0 {
- break
- }
- }
- }
-
- if len(entries) < 1024 {
- break
- }
+ entry, err = f.store.FindEntry(ctx, p)
+ if entry != nil && entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ return nil, filer_pb.ErrNotFound
}
-
- f.cacheDelDirectory(string(p))
-
}
+ return
- if shouldDeleteChunks {
- f.DeleteChunks(p, entry.Chunks)
- }
+}
- if p == "/" {
- return nil
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
+ if strings.HasSuffix(string(p), "/") && len(p) > 1 {
+ p = p[0 : len(p)-1]
}
- glog.V(3).Infof("deleting entry %v", p)
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+ var makeupEntries []*Entry
+ entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ for expiredCount > 0 && err == nil {
+ makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount)
+ if err == nil {
+ entries = append(entries, makeupEntries...)
+ }
+ }
- return f.store.DeleteEntry(ctx, p)
+ return entries, err
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
- if strings.HasSuffix(string(p), "/") && len(p) > 1 {
- p = p[0 : len(p)-1]
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
+ listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ if listErr != nil {
+ return listedEntries, expiredCount, "", listErr
}
- return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ for _, entry := range listedEntries {
+ lastFileName = entry.Name()
+ if entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ expiredCount++
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ return
}
func (f *Filer) cacheDelDirectory(dirpath string) {
diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go
new file mode 100644
index 000000000..601b7dbf3
--- /dev/null
+++ b/weed/filer2/filer_buckets.go
@@ -0,0 +1,113 @@
+package filer2
+
+import (
+ "context"
+ "math"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type BucketName string
+type BucketOption struct {
+ Name BucketName
+ Replication string
+}
+type FilerBuckets struct {
+ dirBucketsPath string
+ buckets map[BucketName]*BucketOption
+ sync.RWMutex
+}
+
+func (f *Filer) LoadBuckets(dirBucketsPath string) {
+
+ f.buckets = &FilerBuckets{
+ buckets: make(map[BucketName]*BucketOption),
+ }
+ f.DirBucketsPath = dirBucketsPath
+
+ limit := math.MaxInt32
+
+ entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit)
+
+ if err != nil {
+ glog.V(1).Infof("no buckets found: %v", err)
+ return
+ }
+
+ glog.V(1).Infof("buckets found: %d", len(entries))
+
+ f.buckets.Lock()
+ for _, entry := range entries {
+ f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
+ Name: BucketName(entry.Name()),
+ Replication: entry.Replication,
+ }
+ }
+ f.buckets.Unlock()
+
+}
+
+func (f *Filer) ReadBucketOption(buketName string) (replication string) {
+
+ f.buckets.RLock()
+ defer f.buckets.RUnlock()
+
+ option, found := f.buckets.buckets[BucketName(buketName)]
+
+ if !found {
+ return ""
+ }
+ return option.Replication
+
+}
+
+func (f *Filer) isBucket(entry *Entry) bool {
+ if !entry.IsDirectory() {
+ return false
+ }
+ parent, dirName := entry.FullPath.DirAndName()
+ if parent != f.DirBucketsPath {
+ return false
+ }
+
+ f.buckets.RLock()
+ defer f.buckets.RUnlock()
+
+ _, found := f.buckets.buckets[BucketName(dirName)]
+
+ return found
+
+}
+
+func (f *Filer) maybeAddBucket(entry *Entry) {
+ if !entry.IsDirectory() {
+ return
+ }
+ parent, dirName := entry.FullPath.DirAndName()
+ if parent != f.DirBucketsPath {
+ return
+ }
+ f.addBucket(dirName, &BucketOption{
+ Name: BucketName(dirName),
+ Replication: entry.Replication,
+ })
+}
+
+func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {
+
+ f.buckets.Lock()
+ defer f.buckets.Unlock()
+
+ f.buckets.buckets[BucketName(buketName)] = bucketOption
+
+}
+
+func (f *Filer) deleteBucket(buketName string) {
+
+ f.buckets.Lock()
+ defer f.buckets.Unlock()
+
+ delete(f.buckets.buckets, BucketName(buketName))
+
+}
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
index 7e093eea2..1c1fa6a5b 100644
--- a/weed/filer2/filer_client_util.go
+++ b/weed/filer2/filer_client_util.go
@@ -3,6 +3,8 @@ package filer2
import (
"context"
"fmt"
+ "io"
+ "math"
"strings"
"sync"
@@ -20,10 +22,11 @@ func VolumeId(fileId string) string {
}
type FilerClient interface {
- WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error
+ WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
+ AdjustedUrl(hostAndPort string) string
}
-func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
+func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
var vids []string
for _, chunkView := range chunkViews {
vids = append(vids, VolumeId(chunkView.FileId))
@@ -31,10 +34,10 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s
vid2Locations := make(map[string]*filer_pb.Locations)
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})
if err != nil {
@@ -65,20 +68,16 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s
return
}
+ volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
var n int64
- n, err = util.ReadUrl(
- fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
- chunkView.Offset,
- int(chunkView.Size),
- buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
- !chunkView.IsFullChunk)
+ n, err = util.ReadUrl(fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)])
if err != nil {
- glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err)
+ glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
err = fmt.Errorf("failed to read http://%s/%s: %v",
- locations.Locations[0].Url, chunkView.FileId, err)
+ volumeServerAddress, chunkView.FileId, err)
return
}
@@ -91,68 +90,75 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s
return
}
-func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) {
+func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
- dir, name := FullPath(fullFilePath).DirAndName()
+ dir, name := fullFilePath.DirAndName()
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
}
- glog.V(3).Infof("read %s request: %v", fullFilePath, request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
+ // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
+ resp, err := filer_pb.LookupEntry(client, request)
if err != nil {
- if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
+ if err == filer_pb.ErrNotFound {
return nil
}
- glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err)
+ glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
return err
}
- if resp.Entry != nil {
- entry = resp.Entry
+ if resp.Entry == nil {
+ // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
+ return nil
}
+ entry = resp.Entry
return nil
})
return
}
-func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) {
+func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- paginationLimit := 1024
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
lastEntryName := ""
- for {
-
- request := &filer_pb.ListEntriesRequest{
- Directory: fullDirPath,
- StartFromFileName: lastEntryName,
- Limit: uint32(paginationLimit),
- }
+ request := &filer_pb.ListEntriesRequest{
+ Directory: string(fullDirPath),
+ Prefix: prefix,
+ StartFromFileName: lastEntryName,
+ Limit: math.MaxUint32,
+ }
- glog.V(3).Infof("read directory: %v", request)
- resp, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
+ glog.V(3).Infof("read directory: %v", request)
+ stream, err := client.ListEntries(context.Background(), request)
+ if err != nil {
+ return fmt.Errorf("list %s: %v", fullDirPath, err)
+ }
- for _, entry := range resp.Entries {
- fn(entry)
- lastEntryName = entry.Name
+ var prevEntry *filer_pb.Entry
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ if prevEntry != nil {
+ fn(prevEntry, true)
+ }
+ break
+ } else {
+ return recvErr
+ }
}
-
- if len(resp.Entries) < paginationLimit {
- break
+ if prevEntry != nil {
+ fn(prevEntry, false)
}
-
+ prevEntry = resp.Entry
}
return nil
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
new file mode 100644
index 000000000..d0792ac66
--- /dev/null
+++ b/weed/filer2/filer_delete_entry.go
@@ -0,0 +1,125 @@
+package filer2
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+)
+
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
+ if p == "/" {
+ return nil
+ }
+
+ entry, findErr := f.FindEntry(ctx, p)
+ if findErr != nil {
+ return findErr
+ }
+
+ isCollection := f.isBucket(entry)
+
+ var chunks []*filer_pb.FileChunk
+ chunks = append(chunks, entry.Chunks...)
+ if entry.IsDirectory() {
+ // delete the folder children, not including the folder itself
+ var dirChunks []*filer_pb.FileChunk
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection)
+ if err != nil {
+ glog.V(0).Infof("delete directory %s: %v", p, err)
+ return fmt.Errorf("delete directory %s: %v", p, err)
+ }
+ chunks = append(chunks, dirChunks...)
+ }
+
+ // delete the file or folder
+ err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
+ if err != nil {
+ return fmt.Errorf("delete file %s: %v", p, err)
+ }
+
+ if shouldDeleteChunks && !isCollection {
+ go f.DeleteChunks(chunks)
+ }
+ if isCollection {
+ collectionName := entry.Name()
+ f.doDeleteCollection(collectionName)
+ f.deleteBucket(collectionName)
+ }
+
+ return nil
+}
+
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
+
+ lastFileName := ""
+ includeLastFile := false
+ for {
+ entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
+ if err != nil {
+ glog.Errorf("list folder %s: %v", entry.FullPath, err)
+ return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ }
+ if lastFileName == "" && !isRecursive && len(entries) > 0 {
+ // only for first iteration in the loop
+ return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ }
+
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ var dirChunks []*filer_pb.FileChunk
+ if sub.IsDirectory() {
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ chunks = append(chunks, dirChunks...)
+ } else {
+ chunks = append(chunks, sub.Chunks...)
+ }
+ if err != nil && !ignoreRecursiveError {
+ return nil, err
+ }
+ }
+
+ if len(entries) < PaginationSize {
+ break
+ }
+ }
+
+ f.cacheDelDirectory(string(entry.FullPath))
+
+ glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
+
+ if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
+ return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return chunks, nil
+}
+
+func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
+
+ glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
+
+ if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return nil
+}
+
+func (f *Filer) doDeleteCollection(collectionName string) (err error) {
+
+ return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ Name: collectionName,
+ })
+ if err != nil {
+ glog.Infof("delete collection %s: %v", collectionName, err)
+ }
+ return err
+ })
+
+}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 25e27e504..3a64f636e 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -10,8 +10,6 @@ import (
func (f *Filer) loopProcessingDeletion() {
- ticker := time.NewTicker(5 * time.Second)
-
lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
for _, vid := range vids {
@@ -31,37 +29,35 @@ func (f *Filer) loopProcessingDeletion() {
return m, nil
}
- var fileIds []string
+ var deletionCount int
for {
- select {
- case fid := <-f.fileIdDeletionChan:
- fileIds = append(fileIds, fid)
- if len(fileIds) >= 4096 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
+ deletionCount = 0
+ f.fileIdDeletionQueue.Consume(func(fileIds []string) {
+ deletionCount = len(fileIds)
+ _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
+ if err != nil {
+ glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ } else {
+ glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
}
+ })
+
+ if deletionCount == 0 {
+ time.Sleep(1123 * time.Millisecond)
}
}
}
-func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
+func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String())
- f.fileIdDeletionChan <- chunk.GetFileIdString()
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
}
}
// DeleteFileByFileId direct delete by file id.
// Only used when the fileId is not being managed by snapshots.
func (f *Filer) DeleteFileByFileId(fileId string) {
- f.fileIdDeletionChan <- fileId
+ f.fileIdDeletionQueue.EnQueue(fileId)
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
@@ -70,7 +66,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
return
}
if newEntry == nil {
- f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks)
+ f.DeleteChunks(oldEntry.Chunks)
}
var toDelete []*filer_pb.FileChunk
@@ -84,5 +80,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(oldEntry.FullPath, toDelete)
+ f.DeleteChunks(toDelete)
}
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index 8caa44ee2..f724f79c2 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -2,7 +2,6 @@ package filer2
import (
"context"
- "errors"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -14,12 +13,13 @@ type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration util.Configuration) error
+ Initialize(configuration util.Configuration, prefix string) error
InsertEntry(context.Context, *Entry) error
UpdateEntry(context.Context, *Entry) (err error)
// err == filer2.ErrNotFound if not found
FindEntry(context.Context, FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, FullPath) (err error)
+ DeleteFolderChildren(context.Context, FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
BeginTransaction(ctx context.Context) (context.Context, error)
@@ -27,8 +27,6 @@ type FilerStore interface {
RollbackTransaction(ctx context.Context) error
}
-var ErrNotFound = errors.New("filer: no entry is found in filer store")
-
type FilerStoreWrapper struct {
actualStore FilerStore
}
@@ -46,8 +44,8 @@ func (fsw *FilerStoreWrapper) GetName() string {
return fsw.actualStore.GetName()
}
-func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error {
- return fsw.actualStore.Initialize(configuration)
+func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
+ return fsw.actualStore.Initialize(configuration, prefix)
}
func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
@@ -97,6 +95,16 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err
return fsw.actualStore.DeleteEntry(ctx, fp)
}
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
+ }()
+
+ return fsw.actualStore.DeleteFolderChildren(ctx, fp)
+}
+
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
start := time.Now()
diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go
index 191e51cf3..133069f93 100644
--- a/weed/filer2/fullpath.go
+++ b/weed/filer2/fullpath.go
@@ -3,6 +3,8 @@ package filer2
import (
"path/filepath"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type FullPath string
@@ -34,3 +36,7 @@ func (fp FullPath) Child(name string) FullPath {
}
return FullPath(dir + "/" + name)
}
+
+func (fp FullPath) AsInode() uint64 {
+ return uint64(util.HashStringToLong(string(fp)))
+}
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index d00eba859..807fcb56f 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -5,12 +5,14 @@ import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -29,8 +31,8 @@ func (store *LevelDBStore) GetName() string {
return "leveldb"
}
-func (store *LevelDBStore) Initialize(configuration weed_util.Configuration) (err error) {
- dir := configuration.GetString("dir")
+func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
return store.initialize(dir)
}
@@ -93,7 +95,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPa
data, err := store.db.Get(key, nil)
if err == leveldb.ErrNotFound {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
@@ -123,6 +125,34 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full
return nil
}
+func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ batch := new(leveldb.Batch)
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete([]byte(genKey(string(fullpath), fileName)))
+ }
+ iter.Release()
+
+ err = store.db.Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 904de8c97..497158420 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -9,7 +9,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, 0)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -30,7 +30,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := filer.CreateEntry(ctx, entry1); err != nil {
+ if err := filer.CreateEntry(ctx, entry1, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -64,7 +64,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, 0)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
index 4b47d2eb3..0b07c6833 100644
--- a/weed/filer2/leveldb2/leveldb2_store.go
+++ b/weed/filer2/leveldb2/leveldb2_store.go
@@ -8,12 +8,14 @@ import (
"io"
"os"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -29,8 +31,8 @@ func (store *LevelDB2Store) GetName() string {
return "leveldb2"
}
-func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) {
- dir := configuration.GetString("dir")
+func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
return store.initialize(dir, 8)
}
@@ -103,7 +105,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullP
data, err := store.dbs[partitionId].Get(key, nil)
if err == leveldb.ErrNotFound {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
@@ -134,6 +136,34 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful
return nil
}
+func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
+
+ batch := new(leveldb.Batch)
+
+ iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete(append(directoryPrefix, []byte(fileName)...))
+ }
+ iter.Release()
+
+ err = store.dbs[partitionId].Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index e28ef7dac..dc94f2ac7 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -9,7 +9,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, 0)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
@@ -30,7 +30,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := filer.CreateEntry(ctx, entry1); err != nil {
+ if err := filer.CreateEntry(ctx, entry1, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -64,7 +64,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
+ filer := filer2.NewFiler(nil, nil, 0)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go
deleted file mode 100644
index 9c10a5472..000000000
--- a/weed/filer2/memdb/memdb_store.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package memdb
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/google/btree"
- "strings"
- "sync"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &MemDbStore{})
-}
-
-type MemDbStore struct {
- tree *btree.BTree
- treeLock sync.Mutex
-}
-
-type entryItem struct {
- *filer2.Entry
-}
-
-func (a entryItem) Less(b btree.Item) bool {
- return strings.Compare(string(a.FullPath), string(b.(entryItem).FullPath)) < 0
-}
-
-func (store *MemDbStore) GetName() string {
- return "memory"
-}
-
-func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) {
- store.tree = btree.New(8)
- return nil
-}
-
-func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *MemDbStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *MemDbStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- // println("inserting", entry.FullPath)
- store.treeLock.Lock()
- store.tree.ReplaceOrInsert(entryItem{entry})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- if _, err = store.FindEntry(ctx, entry.FullPath); err != nil {
- return fmt.Errorf("no such file %s : %v", entry.FullPath, err)
- }
- store.treeLock.Lock()
- store.tree.ReplaceOrInsert(entryItem{entry})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}})
- if item == nil {
- return nil, filer2.ErrNotFound
- }
- entry = item.(entryItem).Entry
- return entry, nil
-}
-
-func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- store.treeLock.Lock()
- store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
-
- startFrom := string(fullpath)
- if startFileName != "" {
- startFrom = startFrom + "/" + startFileName
- }
-
- store.tree.AscendGreaterOrEqual(entryItem{&filer2.Entry{FullPath: filer2.FullPath(startFrom)}},
- func(item btree.Item) bool {
- if limit <= 0 {
- return false
- }
- entry := item.(entryItem).Entry
- // println("checking", entry.FullPath)
-
- if entry.FullPath == fullpath {
- // skipping the current directory
- // println("skipping the folder", entry.FullPath)
- return true
- }
-
- dir, name := entry.FullPath.DirAndName()
- if name == startFileName {
- if inclusive {
- limit--
- entries = append(entries, entry)
- }
- return true
- }
-
- // only iterate the same prefix
- if !strings.HasPrefix(string(entry.FullPath), string(fullpath)) {
- // println("breaking from", entry.FullPath)
- return false
- }
-
- if dir != string(fullpath) {
- // this could be items in deeper directories
- // println("skipping deeper folder", entry.FullPath)
- return true
- }
- // now process the directory items
- // println("adding entry", entry.FullPath)
- limit--
- entries = append(entries, entry)
- return true
- },
- )
- return entries, nil
-}
diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go
deleted file mode 100644
index 3fd806aeb..000000000
--- a/weed/filer2/memdb/memdb_store_test.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package memdb
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "testing"
-)
-
-func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- store := &MemDbStore{}
- store.Initialize(nil)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
-
- entry1 := &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- if err := filer.CreateEntry(ctx, entry1); err != nil {
- t.Errorf("create entry %v: %v", entry1.FullPath, err)
- return
- }
-
- entry, err := filer.FindEntry(ctx, fullpath)
-
- if err != nil {
- t.Errorf("find entry: %v", err)
- return
- }
-
- if entry.FullPath != entry1.FullPath {
- t.Errorf("find wrong entry: %v", entry.FullPath)
- return
- }
-
-}
-
-func TestCreateFileAndList(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- store := &MemDbStore{}
- store.Initialize(nil)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- entry1 := &filer2.Entry{
- FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"),
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- entry2 := &filer2.Entry{
- FullPath: filer2.FullPath("/home/chris/this/is/one/file2.jpg"),
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- filer.CreateEntry(ctx, entry1)
- filer.CreateEntry(ctx, entry2)
-
- // checking the 2 files
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100)
-
- if err != nil {
- t.Errorf("list entries: %v", err)
- return
- }
-
- if len(entries) != 2 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- if entries[0].FullPath != entry1.FullPath {
- t.Errorf("find wrong entry 1: %v", entries[0].FullPath)
- return
- }
-
- if entries[1].FullPath != entry2.FullPath {
- t.Errorf("find wrong entry 2: %v", entries[1].FullPath)
- return
- }
-
- // checking the offset
- entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking root directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // add file3
- file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg")
- entry3 := &filer2.Entry{
- FullPath: file3Path,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
- filer.CreateEntry(ctx, entry3)
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 2 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // delete file and count
- filer.DeleteEntryMetaAndData(ctx, file3Path, false, false, false)
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go
index e18299bd2..63d99cd9d 100644
--- a/weed/filer2/mysql/mysql_store.go
+++ b/weed/filer2/mysql/mysql_store.go
@@ -26,28 +26,35 @@ func (store *MysqlStore) GetName() string {
return "mysql"
}
-func (store *MysqlStore) Initialize(configuration util.Configuration) (err error) {
+func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
- configuration.GetString("username"),
- configuration.GetString("password"),
- configuration.GetString("hostname"),
- configuration.GetInt("port"),
- configuration.GetString("database"),
- configuration.GetInt("connection_max_idle"),
- configuration.GetInt("connection_max_open"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetBool(prefix+"interpolateParams"),
)
}
-func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int) (err error) {
+func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
+ interpolateParams bool) (err error) {
store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?"
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ if interpolateParams {
+ sqlUrl += "&interpolateParams=true"
+ }
+
var dbErr error
store.DB, dbErr = sql.Open("mysql", sqlUrl)
if dbErr != nil {
diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go
index ffd3d1e01..27a0c2513 100644
--- a/weed/filer2/postgres/postgres_store.go
+++ b/weed/filer2/postgres/postgres_store.go
@@ -26,16 +26,16 @@ func (store *PostgresStore) GetName() string {
return "postgres"
}
-func (store *PostgresStore) Initialize(configuration util.Configuration) (err error) {
+func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
- configuration.GetString("username"),
- configuration.GetString("password"),
- configuration.GetString("hostname"),
- configuration.GetInt("port"),
- configuration.GetString("database"),
- configuration.GetString("sslmode"),
- configuration.GetInt("connection_max_idle"),
- configuration.GetInt("connection_max_open"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetString(prefix+"sslmode"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
)
}
@@ -45,6 +45,7 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go
index 11c315391..eaaecb740 100644
--- a/weed/filer2/redis/redis_cluster_store.go
+++ b/weed/filer2/redis/redis_cluster_store.go
@@ -18,17 +18,25 @@ func (store *RedisClusterStore) GetName() string {
return "redis_cluster"
}
-func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) {
+func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", true)
+ configuration.SetDefault(prefix+"routeByLatency", true)
+
return store.initialize(
- configuration.GetStringSlice("addresses"),
- configuration.GetString("password"),
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
)
}
-func (store *RedisClusterStore) initialize(addresses []string, password string) (err error) {
+func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: addresses,
- Password: password,
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
})
return
}
diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go
index c56fa014c..9debdb070 100644
--- a/weed/filer2/redis/redis_store.go
+++ b/weed/filer2/redis/redis_store.go
@@ -18,11 +18,11 @@ func (store *RedisStore) GetName() string {
return "redis"
}
-func (store *RedisStore) Initialize(configuration util.Configuration) (err error) {
+func (store *RedisStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
- configuration.GetString("address"),
- configuration.GetString("password"),
- configuration.GetInt("database"),
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
)
}
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index ce41d4d70..c5b9d9416 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -3,12 +3,15 @@ package redis
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/go-redis/redis"
"sort"
"strings"
"time"
+
+ "github.com/go-redis/redis"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
const (
@@ -62,7 +65,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2
data, err := store.Client.Get(string(fullpath)).Result()
if err == redis.Nil {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
if err != nil {
@@ -99,10 +102,29 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file
return nil
}
+func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ if err != nil {
+ return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ }
+
+ for _, fileName := range members {
+ path := filer2.NewFullPath(string(fullpath), fileName)
+ _, err = store.Client.Del(string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ dirListKey := genDirectoryListKey(string(fullpath))
+ members, err := store.Client.SMembers(dirListKey).Result()
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
@@ -141,6 +163,13 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
} else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(string(path)).Result()
+ store.Client.SRem(dirListKey, fileName).Result()
+ continue
+ }
+ }
entries = append(entries, entry)
}
}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index 01b87cad1..381d99144 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -26,8 +26,9 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
}
for _, chunkView := range chunkViews {
+
urlString := fileId2Url[chunkView.FileId]
- _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
w.Write(data)
})
if err != nil {
diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go
index 0e6b93c86..2a9dd6648 100644
--- a/weed/filer2/tikv/tikv_store.go
+++ b/weed/filer2/tikv/tikv_store.go
@@ -1,5 +1,6 @@
// +build !386
// +build !arm
+// +build !windows
package tikv
@@ -12,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/pingcap/tidb/kv"
@@ -30,8 +32,8 @@ func (store *TikvStore) GetName() string {
return "tikv"
}
-func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) {
- pdAddr := configuration.GetString("pdAddress")
+func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ pdAddr := configuration.GetString(prefix + "pdAddress")
return store.initialize(pdAddr)
}
@@ -110,7 +112,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath)
data, err := store.getTx(ctx).Get(ctx, key)
if err == kv.ErrNotExist {
- return nil, filer2.ErrNotFound
+ return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
@@ -141,6 +143,38 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
return nil
}
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ tx := store.getTx(ctx)
+
+ iter, err := tx.Iter(directoryPrefix, nil)
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ iter.Next()
+ continue
+ }
+
+ if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ iter.Next()
+ }
+
+ return nil
+}
+
func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/tikv/tikv_store_unsupported.go b/weed/filer2/tikv/tikv_store_unsupported.go
index 95c8b2dad..713c84bf8 100644
--- a/weed/filer2/tikv/tikv_store_unsupported.go
+++ b/weed/filer2/tikv/tikv_store_unsupported.go
@@ -1,4 +1,4 @@
-// +build 386 arm
+// +build 386 arm windows
package tikv
@@ -21,7 +21,7 @@ func (store *TikvStore) GetName() string {
return "tikv"
}
-func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) {
+func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
return fmt.Errorf("not implemented for 32 bit computers")
}
@@ -55,6 +55,10 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
return fmt.Errorf("not implemented for 32 bit computers")
}
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
return nil, fmt.Errorf("not implemented for 32 bit computers")