aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-01 00:21:19 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-01 00:21:19 -0700
commiteb7929a9714d5d4ea8d9d70f58198b09bc459ead (patch)
tree46a4662722f8bf7c6d771beef8d59a6f78a53b4f /weed/filer
parent38e06d783d0a910c3df8e22bd097d3409e5d5312 (diff)
downloadseaweedfs-eb7929a9714d5d4ea8d9d70f58198b09bc459ead.tar.xz
seaweedfs-eb7929a9714d5d4ea8d9d70f58198b09bc459ead.zip
rename filer2 to filer
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go206
-rw-r--r--weed/filer/cassandra/README.txt14
-rw-r--r--weed/filer/cassandra/cassandra_store.go163
-rw-r--r--weed/filer/configuration.go50
-rw-r--r--weed/filer/entry.go91
-rw-r--r--weed/filer/entry_codec.go124
-rw-r--r--weed/filer/etcd/etcd_store.go204
-rw-r--r--weed/filer/filechunk_manifest.go168
-rw-r--r--weed/filer/filechunk_manifest_test.go113
-rw-r--r--weed/filer/filechunks.go284
-rw-r--r--weed/filer/filechunks2_test.go46
-rw-r--r--weed/filer/filechunks_test.go539
-rw-r--r--weed/filer/filer.go290
-rw-r--r--weed/filer/filer_buckets.go121
-rw-r--r--weed/filer/filer_delete_entry.go129
-rw-r--r--weed/filer/filer_deletion.go109
-rw-r--r--weed/filer/filer_notify.go169
-rw-r--r--weed/filer/filer_notify_append.go73
-rw-r--r--weed/filer/filer_notify_test.go53
-rw-r--r--weed/filer/filerstore.go208
-rw-r--r--weed/filer/leveldb/leveldb_store.go231
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go88
-rw-r--r--weed/filer/leveldb2/leveldb2_local_store.go43
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go251
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go88
-rw-r--r--weed/filer/meta_aggregator.go131
-rw-r--r--weed/filer/meta_replay.go37
-rw-r--r--weed/filer/mongodb/mongodb_store.go214
-rw-r--r--weed/filer/mysql/mysql_store.go74
-rw-r--r--weed/filer/permission.go22
-rw-r--r--weed/filer/postgres/README.txt17
-rw-r--r--weed/filer/postgres/postgres_store.go75
-rw-r--r--weed/filer/reader_at.go149
-rw-r--r--weed/filer/reader_at_test.go156
-rw-r--r--weed/filer/redis/redis_cluster_store.go42
-rw-r--r--weed/filer/redis/redis_store.go36
-rw-r--r--weed/filer/redis/universal_redis_store.go191
-rw-r--r--weed/filer/redis2/redis_cluster_store.go42
-rw-r--r--weed/filer/redis2/redis_store.go36
-rw-r--r--weed/filer/redis2/universal_redis_store.go166
-rw-r--r--weed/filer/stream.go204
-rw-r--r--weed/filer/topics.go6
42 files changed, 5453 insertions, 0 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
new file mode 100644
index 000000000..a6de2ea39
--- /dev/null
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -0,0 +1,206 @@
+package abstract_sql
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type AbstractSqlStore struct {
+ DB *sql.DB
+ SqlInsert string
+ SqlUpdate string
+ SqlFind string
+ SqlDelete string
+ SqlDeleteFolderChildren string
+ SqlListExclusive string
+ SqlListInclusive string
+}
+
+type TxOrDB interface {
+ ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
+ QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+ QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
+}
+
+func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
+ Isolation: sql.LevelReadCommitted,
+ ReadOnly: false,
+ })
+ if err != nil {
+ return ctx, err
+ }
+
+ return context.WithValue(ctx, "tx", tx), nil
+}
+func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx.Commit()
+ }
+ return nil
+}
+func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx
+ }
+ return store.DB
+}
+
+func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
+ if err != nil {
+ return fmt.Errorf("insert %s: %s", entry.FullPath, err)
+ }
+
+ affectedRows, err := res.RowsAffected()
+ if err == nil && affectedRows > 0 {
+ return nil
+ }
+
+ // now the insert failed possibly due to duplication constraints
+ glog.V(1).Infof("insert %s falls back to update: %s", entry.FullPath, err)
+
+ res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ if err != nil {
+ return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err)
+ }
+ return nil
+
+}
+
+func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ if err != nil {
+ return fmt.Errorf("update %s: %s", entry.FullPath, err)
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
+ }
+ return nil
+}
+
+func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
+
+ dir, name := fullpath.DirAndName()
+ row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
+ var data []byte
+ if err := row.Scan(&data); err != nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry := &filer.Entry{
+ FullPath: fullpath,
+ }
+ if err := entry.DecodeAttributesAndChunks(data); err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
+
+ dir, name := fullpath.DirAndName()
+
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
+ if err != nil {
+ return fmt.Errorf("delete %s: %s", fullpath, err)
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ sqlText := store.SqlListExclusive
+ if inclusive {
+ sqlText = store.SqlListInclusive
+ }
+
+ rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit)
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var name string
+ var data []byte
+ if err = rows.Scan(&name, &data); err != nil {
+ glog.V(0).Infof("scan %s : %v", fullpath, err)
+ return nil, fmt.Errorf("scan %s: %v", fullpath, err)
+ }
+
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(fullpath), name),
+ }
+ if err = entry.DecodeAttributesAndChunks(data); err != nil {
+ glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
+ }
+
+ entries = append(entries, entry)
+ }
+
+ return entries, nil
+}
+
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+}
+
+func (store *AbstractSqlStore) Shutdown() {
+ store.DB.Close()
+}
diff --git a/weed/filer/cassandra/README.txt b/weed/filer/cassandra/README.txt
new file mode 100644
index 000000000..122c9c3f4
--- /dev/null
+++ b/weed/filer/cassandra/README.txt
@@ -0,0 +1,14 @@
+1. create a keyspace
+
+CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
+
+2. create filemeta table
+
+ USE seaweedfs;
+
+ CREATE TABLE filemeta (
+ directory varchar,
+ name varchar,
+ meta blob,
+ PRIMARY KEY (directory, name)
+ ) WITH CLUSTERING ORDER BY (name ASC);
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
new file mode 100644
index 000000000..fd161b1f1
--- /dev/null
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -0,0 +1,163 @@
+package cassandra
+
+import (
+ "context"
+ "fmt"
+ "github.com/gocql/gocql"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &CassandraStore{})
+}
+
+type CassandraStore struct {
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+}
+
+func (store *CassandraStore) GetName() string {
+ return "cassandra"
+}
+
+func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"keyspace"),
+ configuration.GetStringSlice(prefix+"hosts"),
+ )
+}
+
+func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
+ store.cluster = gocql.NewCluster(hosts...)
+ store.cluster.Keyspace = keyspace
+ store.cluster.Consistency = gocql.LocalQuorum
+ store.session, err = store.cluster.CreateSession()
+ if err != nil {
+ glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
+ }
+ return
+}
+
+func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if err := store.session.Query(
+ "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
+ dir, name, meta, entry.TtlSec).Exec(); err != nil {
+ return fmt.Errorf("insert %s: %s", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ dir, name := fullpath.DirAndName()
+ var data []byte
+ if err := store.session.Query(
+ "SELECT meta FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Consistency(gocql.One).Scan(&data); err != nil {
+ if err != gocql.ErrNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ }
+
+ if len(data) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
+
+ dir, name := fullpath.DirAndName()
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=?",
+ fullpath).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer.Entry, err error) {
+
+ cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
+ if inclusive {
+ cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
+ }
+
+ var data []byte
+ var name string
+ iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
+ for iter.Scan(&name, &data) {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(fullpath), name),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+ if err := iter.Close(); err != nil {
+ glog.V(0).Infof("list iterator close: %v", err)
+ }
+
+ return entries, err
+}
+
+func (store *CassandraStore) Shutdown() {
+ store.session.Close()
+}
diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go
new file mode 100644
index 000000000..3dce67d6d
--- /dev/null
+++ b/weed/filer/configuration.go
@@ -0,0 +1,50 @@
+package filer
+
+import (
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/spf13/viper"
+)
+
+var (
+ Stores []FilerStore
+)
+
+func (f *Filer) LoadConfiguration(config *viper.Viper) {
+
+ validateOneEnabledStore(config)
+
+ for _, store := range Stores {
+ if config.GetBool(store.GetName() + ".enabled") {
+ if err := store.Initialize(config, store.GetName()+"."); err != nil {
+ glog.Fatalf("Failed to initialize store for %s: %+v",
+ store.GetName(), err)
+ }
+ f.SetStore(store)
+ glog.V(0).Infof("Configure filer for %s", store.GetName())
+ return
+ }
+ }
+
+ println()
+ println("Supported filer stores are:")
+ for _, store := range Stores {
+ println(" " + store.GetName())
+ }
+
+ os.Exit(-1)
+}
+
+func validateOneEnabledStore(config *viper.Viper) {
+ enabledStore := ""
+ for _, store := range Stores {
+ if config.GetBool(store.GetName() + ".enabled") {
+ if enabledStore == "" {
+ enabledStore = store.GetName()
+ } else {
+ glog.Fatalf("Filer store is enabled for both %s and %s", enabledStore, store.GetName())
+ }
+ }
+ }
+}
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
new file mode 100644
index 000000000..4a73de19a
--- /dev/null
+++ b/weed/filer/entry.go
@@ -0,0 +1,91 @@
+package filer
+
+import (
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type Attr struct {
+ Mtime time.Time // time of last modification
+ Crtime time.Time // time of creation (OS X only)
+ Mode os.FileMode // file mode
+ Uid uint32 // owner uid
+ Gid uint32 // group gid
+ Mime string // mime type
+ Replication string // replication
+ Collection string // collection name
+ TtlSec int32 // ttl in seconds
+ UserName string
+ GroupNames []string
+ SymlinkTarget string
+ Md5 []byte
+ FileSize uint64
+}
+
+func (attr Attr) IsDirectory() bool {
+ return attr.Mode&os.ModeDir > 0
+}
+
+type Entry struct {
+ util.FullPath
+
+ Attr
+ Extended map[string][]byte
+
+ // the following is for files
+ Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`
+}
+
+func (entry *Entry) Size() uint64 {
+ return maxUint64(TotalSize(entry.Chunks), entry.FileSize)
+}
+
+func (entry *Entry) Timestamp() time.Time {
+ if entry.IsDirectory() {
+ return entry.Crtime
+ } else {
+ return entry.Mtime
+ }
+}
+
+func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
+ if entry == nil {
+ return nil
+ }
+ return &filer_pb.Entry{
+ Name: entry.FullPath.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Attributes: EntryAttributeToPb(entry),
+ Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ }
+}
+
+func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
+ if entry == nil {
+ return nil
+ }
+ dir, _ := entry.FullPath.DirAndName()
+ return &filer_pb.FullEntry{
+ Dir: dir,
+ Entry: entry.ToProtoEntry(),
+ }
+}
+
+func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry {
+ return &Entry{
+ FullPath: util.NewFullPath(dir, entry.Name),
+ Attr: PbToEntryAttribute(entry.Attributes),
+ Chunks: entry.Chunks,
+ }
+}
+
+func maxUint64(x, y uint64) uint64 {
+ if x > y {
+ return x
+ }
+ return y
+}
diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go
new file mode 100644
index 000000000..fb6448b30
--- /dev/null
+++ b/weed/filer/entry_codec.go
@@ -0,0 +1,124 @@
+package filer
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
+ message := &filer_pb.Entry{
+ Attributes: EntryAttributeToPb(entry),
+ Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ }
+ return proto.Marshal(message)
+}
+
+func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
+
+ message := &filer_pb.Entry{}
+
+ if err := proto.UnmarshalMerge(blob, message); err != nil {
+ return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err)
+ }
+
+ entry.Attr = PbToEntryAttribute(message.Attributes)
+
+ entry.Extended = message.Extended
+
+ entry.Chunks = message.Chunks
+
+ return nil
+}
+
+func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
+
+ return &filer_pb.FuseAttributes{
+ Crtime: entry.Attr.Crtime.Unix(),
+ Mtime: entry.Attr.Mtime.Unix(),
+ FileMode: uint32(entry.Attr.Mode),
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Mime: entry.Mime,
+ Collection: entry.Attr.Collection,
+ Replication: entry.Attr.Replication,
+ TtlSec: entry.Attr.TtlSec,
+ UserName: entry.Attr.UserName,
+ GroupName: entry.Attr.GroupNames,
+ SymlinkTarget: entry.Attr.SymlinkTarget,
+ Md5: entry.Attr.Md5,
+ FileSize: entry.Attr.FileSize,
+ }
+}
+
+func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
+
+ t := Attr{}
+
+ t.Crtime = time.Unix(attr.Crtime, 0)
+ t.Mtime = time.Unix(attr.Mtime, 0)
+ t.Mode = os.FileMode(attr.FileMode)
+ t.Uid = attr.Uid
+ t.Gid = attr.Gid
+ t.Mime = attr.Mime
+ t.Collection = attr.Collection
+ t.Replication = attr.Replication
+ t.TtlSec = attr.TtlSec
+ t.UserName = attr.UserName
+ t.GroupNames = attr.GroupName
+ t.SymlinkTarget = attr.SymlinkTarget
+ t.Md5 = attr.Md5
+ t.FileSize = attr.FileSize
+
+ return t
+}
+
+func EqualEntry(a, b *Entry) bool {
+ if a == b {
+ return true
+ }
+ if a == nil && b != nil || a != nil && b == nil {
+ return false
+ }
+ if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) {
+ return false
+ }
+ if len(a.Chunks) != len(b.Chunks) {
+ return false
+ }
+
+ if !eq(a.Extended, b.Extended) {
+ return false
+ }
+
+ if !bytes.Equal(a.Md5, b.Md5) {
+ return false
+ }
+
+ for i := 0; i < len(a.Chunks); i++ {
+ if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
+ return false
+ }
+ }
+ return true
+}
+
+func eq(a, b map[string][]byte) bool {
+ if len(a) != len(b) {
+ return false
+ }
+
+ for k, v := range a {
+ if w, ok := b[k]; !ok || !bytes.Equal(v, w) {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
new file mode 100644
index 000000000..36db4ac01
--- /dev/null
+++ b/weed/filer/etcd/etcd_store.go
@@ -0,0 +1,204 @@
+package etcd
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ "go.etcd.io/etcd/clientv3"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_FILE_SEPARATOR = byte(0x00)
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &EtcdStore{})
+}
+
+type EtcdStore struct {
+ client *clientv3.Client
+}
+
+func (store *EtcdStore) GetName() string {
+ return "etcd"
+}
+
+func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ servers := configuration.GetString(prefix + "servers")
+ if servers == "" {
+ servers = "localhost:2379"
+ }
+
+ timeout := configuration.GetString(prefix + "timeout")
+ if timeout == "" {
+ timeout = "3s"
+ }
+
+ return store.initialize(servers, timeout)
+}
+
+func (store *EtcdStore) initialize(servers string, timeout string) (err error) {
+ glog.Infof("filer store etcd: %s", servers)
+
+ to, err := time.ParseDuration(timeout)
+ if err != nil {
+ return fmt.Errorf("parse timeout %s: %s", timeout, err)
+ }
+
+ store.client, err = clientv3.New(clientv3.Config{
+ Endpoints: strings.Split(servers, ","),
+ DialTimeout: to,
+ })
+ if err != nil {
+ return fmt.Errorf("connect to etcd %s: %s", servers, err)
+ }
+
+ return
+}
+
+func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *EtcdStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ key := genKey(entry.DirAndName())
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ key := genKey(fullpath.DirAndName())
+
+ resp, err := store.client.Get(ctx, string(key))
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ if len(resp.Kvs) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ key := genKey(fullpath.DirAndName())
+
+ if _, err := store.client.Delete(ctx, string(key)); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
+ return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ resp, err := store.client.Get(ctx, string(directoryPrefix),
+ clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+
+ for _, kv := range resp.Kvs {
+ fileName := getNameFromKey(kv.Key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = []byte(dirPath)
+ key = append(key, DIR_FILE_SEPARATOR)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = []byte(string(fullpath))
+ keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+ sepIndex := len(key) - 1
+ for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
+ sepIndex--
+ }
+
+ return string(key[sepIndex+1:])
+}
+
+func (store *EtcdStore) Shutdown() {
+ store.client.Close()
+}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
new file mode 100644
index 000000000..e84cf21e5
--- /dev/null
+++ b/weed/filer/filechunk_manifest.go
@@ -0,0 +1,168 @@
+package filer
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ ManifestBatch = 1000
+)
+
+func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
+ for _, chunk := range chunks {
+ if chunk.IsChunkManifest {
+ return true
+ }
+ }
+ return false
+}
+
+func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
+ for _, c := range chunks {
+ if c.IsChunkManifest {
+ manifestChunks = append(manifestChunks, c)
+ } else {
+ nonManifestChunks = append(nonManifestChunks, c)
+ }
+ }
+ return
+}
+
+func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+ // TODO maybe parallel this
+ for _, chunk := range chunks {
+ if !chunk.IsChunkManifest {
+ dataChunks = append(dataChunks, chunk)
+ continue
+ }
+
+ resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
+ if err != nil {
+ return chunks, nil, err
+ }
+
+ manifestChunks = append(manifestChunks, chunk)
+ // recursive
+ dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
+ if subErr != nil {
+ return chunks, nil, subErr
+ }
+ dataChunks = append(dataChunks, dchunks...)
+ manifestChunks = append(manifestChunks, mchunks...)
+ }
+ return
+}
+
+func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+ if !chunk.IsChunkManifest {
+ return
+ }
+
+ // IsChunkManifest
+ data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
+ if err != nil {
+ return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
+ }
+ m := &filer_pb.FileChunkManifest{}
+ if err := proto.Unmarshal(data, m); err != nil {
+ return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
+ }
+
+ // recursive
+ filer_pb.AfterEntryDeserialization(m.Chunks)
+ return m.Chunks, nil
+}
+
+// TODO fetch from cache for weed mount?
+func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+ urlString, err := lookupFileIdFn(fileId)
+ if err != nil {
+ glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+ var buffer bytes.Buffer
+ err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(0).Infof("read %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+
+ return buffer.Bytes(), nil
+}
+
+func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
+ return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
+}
+
+func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
+
+ var dataChunks []*filer_pb.FileChunk
+ for _, chunk := range inputChunks {
+ if !chunk.IsChunkManifest {
+ dataChunks = append(dataChunks, chunk)
+ } else {
+ chunks = append(chunks, chunk)
+ }
+ }
+
+ remaining := len(dataChunks)
+ for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
+ chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
+ if err != nil {
+ return dataChunks, err
+ }
+ chunks = append(chunks, chunk)
+ remaining -= mergeFactor
+ }
+ // remaining
+ for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
+ chunks = append(chunks, dataChunks[i])
+ }
+ return
+}
+
+func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
+
+ filer_pb.BeforeEntrySerialization(dataChunks)
+
+ // create and serialize the manifest
+ data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
+ Chunks: dataChunks,
+ })
+ if serErr != nil {
+ return nil, fmt.Errorf("serializing manifest: %v", serErr)
+ }
+
+ minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
+ for _, chunk := range dataChunks {
+ if minOffset > int64(chunk.Offset) {
+ minOffset = chunk.Offset
+ }
+ if maxOffset < int64(chunk.Size)+chunk.Offset {
+ maxOffset = int64(chunk.Size) + chunk.Offset
+ }
+ }
+
+ manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
+ if err != nil {
+ return nil, err
+ }
+ manifestChunk.IsChunkManifest = true
+ manifestChunk.Offset = minOffset
+ manifestChunk.Size = uint64(maxOffset - minOffset)
+
+ return
+}
+
+type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)
diff --git a/weed/filer/filechunk_manifest_test.go b/weed/filer/filechunk_manifest_test.go
new file mode 100644
index 000000000..ce12c5da6
--- /dev/null
+++ b/weed/filer/filechunk_manifest_test.go
@@ -0,0 +1,113 @@
+package filer
+
+import (
+ "bytes"
+ "math"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestDoMaybeManifestize(t *testing.T) {
+ var manifestTests = []struct {
+ inputs []*filer_pb.FileChunk
+ expected []*filer_pb.FileChunk
+ }{
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: false},
+ {FileId: "2", IsChunkManifest: false},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "12", IsChunkManifest: true},
+ {FileId: "34", IsChunkManifest: true},
+ },
+ },
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "2", IsChunkManifest: false},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "23", IsChunkManifest: true},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ },
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: false},
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "13", IsChunkManifest: true},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ },
+ {
+ inputs: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "3", IsChunkManifest: false},
+ {FileId: "4", IsChunkManifest: false},
+ },
+ expected: []*filer_pb.FileChunk{
+ {FileId: "1", IsChunkManifest: true},
+ {FileId: "2", IsChunkManifest: true},
+ {FileId: "34", IsChunkManifest: true},
+ },
+ },
+ }
+
+ for i, mtest := range manifestTests {
+ println("test", i)
+ actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge)
+ assertEqualChunks(t, mtest.expected, actual)
+ }
+
+}
+
+func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) {
+ assert.Equal(t, len(expected), len(actual))
+ for i := 0; i < len(actual); i++ {
+ assertEqualChunk(t, actual[i], expected[i])
+ }
+}
+func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) {
+ assert.Equal(t, expected.FileId, actual.FileId)
+ assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest)
+}
+
+func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
+
+ var buf bytes.Buffer
+ minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
+ for k := 0; k < len(dataChunks); k++ {
+ chunk := dataChunks[k]
+ buf.WriteString(chunk.FileId)
+ if minOffset > int64(chunk.Offset) {
+ minOffset = chunk.Offset
+ }
+ if maxOffset < int64(chunk.Size)+chunk.Offset {
+ maxOffset = int64(chunk.Size) + chunk.Offset
+ }
+ }
+
+ manifestChunk = &filer_pb.FileChunk{
+ FileId: buf.String(),
+ }
+ manifestChunk.IsChunkManifest = true
+ manifestChunk.Offset = minOffset
+ manifestChunk.Size = uint64(maxOffset - minOffset)
+
+ return
+}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
new file mode 100644
index 000000000..c45963193
--- /dev/null
+++ b/weed/filer/filechunks.go
@@ -0,0 +1,284 @@
+package filer
+
+import (
+ "fmt"
+ "hash/fnv"
+ "math"
+ "sort"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
+ for _, c := range chunks {
+ t := uint64(c.Offset + int64(c.Size))
+ if size < t {
+ size = t
+ }
+ }
+ return
+}
+
+func FileSize(entry *filer_pb.Entry) (size uint64) {
+ return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
+}
+
+func ETag(entry *filer_pb.Entry) (etag string) {
+ if entry.Attributes == nil || entry.Attributes.Md5 == nil {
+ return ETagChunks(entry.Chunks)
+ }
+ return fmt.Sprintf("%x", entry.Attributes.Md5)
+}
+
+func ETagEntry(entry *Entry) (etag string) {
+ if entry.Attr.Md5 == nil {
+ return ETagChunks(entry.Chunks)
+ }
+ return fmt.Sprintf("%x", entry.Attr.Md5)
+}
+
+func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
+ if len(chunks) == 1 {
+ return chunks[0].ETag
+ }
+
+ h := fnv.New32a()
+ for _, c := range chunks {
+ h.Write([]byte(c.ETag))
+ }
+ return fmt.Sprintf("%x", h.Sum32())
+}
+
+func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
+
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
+
+ fileIds := make(map[string]bool)
+ for _, interval := range visibles {
+ fileIds[interval.fileId] = true
+ }
+ for _, chunk := range chunks {
+ if _, found := fileIds[chunk.GetFileIdString()]; found {
+ compacted = append(compacted, chunk)
+ } else {
+ garbage = append(garbage, chunk)
+ }
+ }
+
+ return
+}
+
+func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
+
+ aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
+ if aErr != nil {
+ return nil, aErr
+ }
+ bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs)
+ if bErr != nil {
+ return nil, bErr
+ }
+
+ delta = append(delta, DoMinusChunks(aData, bData)...)
+ delta = append(delta, DoMinusChunks(aMeta, bMeta)...)
+ return
+}
+
+func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
+
+ fileIds := make(map[string]bool)
+ for _, interval := range bs {
+ fileIds[interval.GetFileIdString()] = true
+ }
+ for _, chunk := range as {
+ if _, found := fileIds[chunk.GetFileIdString()]; !found {
+ delta = append(delta, chunk)
+ }
+ }
+
+ return
+}
+
+type ChunkView struct {
+ FileId string
+ Offset int64
+ Size uint64
+ LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
+ ChunkSize uint64
+ CipherKey []byte
+ IsGzipped bool
+}
+
+func (cv *ChunkView) IsFullChunk() bool {
+ return cv.Size == cv.ChunkSize
+}
+
+func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
+
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
+
+ return ViewFromVisibleIntervals(visibles, offset, size)
+
+}
+
+func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) {
+
+ stop := offset + size
+ if size == math.MaxInt64 {
+ stop = math.MaxInt64
+ }
+ if stop < offset {
+ stop = math.MaxInt64
+ }
+
+ for _, chunk := range visibles {
+
+ chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop)
+
+ if chunkStart < chunkStop {
+ views = append(views, &ChunkView{
+ FileId: chunk.fileId,
+ Offset: chunkStart - chunk.start + chunk.chunkOffset,
+ Size: uint64(chunkStop - chunkStart),
+ LogicOffset: chunkStart,
+ ChunkSize: chunk.chunkSize,
+ CipherKey: chunk.cipherKey,
+ IsGzipped: chunk.isGzipped,
+ })
+ }
+ }
+
+ return views
+
+}
+
+func logPrintf(name string, visibles []VisibleInterval) {
+
+ /*
+ glog.V(0).Infof("%s len %d", name, len(visibles))
+ for _, v := range visibles {
+ glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset)
+ }
+ */
+}
+
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return new(VisibleInterval)
+ },
+}
+
+func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) {
+
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed)
+
+ length := len(visibles)
+ if length == 0 {
+ return append(visibles, newV)
+ }
+ last := visibles[length-1]
+ if last.stop <= chunk.Offset {
+ return append(visibles, newV)
+ }
+
+ logPrintf(" before", visibles)
+ // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size)
+ chunkStop := chunk.Offset + int64(chunk.Size)
+ for _, v := range visibles {
+ if v.start < chunk.Offset && chunk.Offset < v.stop {
+ t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)
+ newVisibles = append(newVisibles, t)
+ // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
+ }
+ if v.start < chunkStop && chunkStop < v.stop {
+ t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)
+ newVisibles = append(newVisibles, t)
+ // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
+ }
+ if chunkStop <= v.start || v.stop <= chunk.Offset {
+ newVisibles = append(newVisibles, v)
+ // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop)
+ }
+ }
+ newVisibles = append(newVisibles, newV)
+
+ logPrintf(" append", newVisibles)
+
+ for i := len(newVisibles) - 1; i >= 0; i-- {
+ if i > 0 && newV.start < newVisibles[i-1].start {
+ newVisibles[i] = newVisibles[i-1]
+ } else {
+ newVisibles[i] = newV
+ break
+ }
+ }
+ logPrintf(" sorted", newVisibles)
+
+ return newVisibles
+}
+
+// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
+// If the file chunk content is a chunk manifest
+func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
+
+ chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
+
+ sort.Slice(chunks, func(i, j int) bool {
+ if chunks[i].Mtime == chunks[j].Mtime {
+ return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
+ }
+ return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run
+ })
+
+ for _, chunk := range chunks {
+
+ // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
+ visibles = MergeIntoVisibles(visibles, chunk)
+
+ logPrintf("add", visibles)
+
+ }
+
+ return
+}
+
+// find non-overlapping visible intervals
+// visible interval map to one file chunk
+
+type VisibleInterval struct {
+ start int64
+ stop int64
+ modifiedTime int64
+ fileId string
+ chunkOffset int64
+ chunkSize uint64
+ cipherKey []byte
+ isGzipped bool
+}
+
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
+ return VisibleInterval{
+ start: start,
+ stop: stop,
+ fileId: fileId,
+ modifiedTime: modifiedTime,
+ chunkOffset: chunkOffset, // the starting position in the chunk
+ chunkSize: chunkSize,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ }
+}
+
+func min(x, y int64) int64 {
+ if x <= y {
+ return x
+ }
+ return y
+}
+func max(x, y int64) int64 {
+ if x <= y {
+ return y
+ }
+ return x
+}
diff --git a/weed/filer/filechunks2_test.go b/weed/filer/filechunks2_test.go
new file mode 100644
index 000000000..9f9566d9b
--- /dev/null
+++ b/weed/filer/filechunks2_test.go
@@ -0,0 +1,46 @@
+package filer
+
+import (
+ "sort"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestCompactFileChunksRealCase(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {FileId: "2,512f31f2c0700a", Offset: 0, Size: 25 - 0, Mtime: 5320497},
+ {FileId: "6,512f2c2e24e9e8", Offset: 868352, Size: 917585 - 868352, Mtime: 5320492},
+ {FileId: "7,514468dd5954ca", Offset: 884736, Size: 901120 - 884736, Mtime: 5325928},
+ {FileId: "5,5144463173fe77", Offset: 917504, Size: 2297856 - 917504, Mtime: 5325894},
+ {FileId: "4,51444c7ab54e2d", Offset: 2301952, Size: 2367488 - 2301952, Mtime: 5325900},
+ {FileId: "4,514450e643ad22", Offset: 2371584, Size: 2420736 - 2371584, Mtime: 5325904},
+ {FileId: "6,514456a5e9e4d7", Offset: 2449408, Size: 2490368 - 2449408, Mtime: 5325910},
+ {FileId: "3,51444f8d53eebe", Offset: 2494464, Size: 2555904 - 2494464, Mtime: 5325903},
+ {FileId: "4,5144578b097c7e", Offset: 2560000, Size: 2596864 - 2560000, Mtime: 5325911},
+ {FileId: "3,51445500b6b4ac", Offset: 2637824, Size: 2678784 - 2637824, Mtime: 5325909},
+ {FileId: "1,51446285e52a61", Offset: 2695168, Size: 2715648 - 2695168, Mtime: 5325922},
+ }
+
+ printChunks("before", chunks)
+
+ compacted, garbage := CompactFileChunks(nil, chunks)
+
+ printChunks("compacted", compacted)
+ printChunks("garbage", garbage)
+
+}
+
+func printChunks(name string, chunks []*filer_pb.FileChunk) {
+ sort.Slice(chunks, func(i, j int) bool {
+ if chunks[i].Offset == chunks[j].Offset {
+ return chunks[i].Mtime < chunks[j].Mtime
+ }
+ return chunks[i].Offset < chunks[j].Offset
+ })
+ for _, chunk := range chunks {
+ glog.V(0).Infof("%s chunk %s [%10d,%10d)", name, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
+ }
+}
diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go
new file mode 100644
index 000000000..699e7e298
--- /dev/null
+++ b/weed/filer/filechunks_test.go
@@ -0,0 +1,539 @@
+package filer
+
+import (
+ "fmt"
+ "log"
+ "math"
+ "math/rand"
+ "strconv"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestCompactFileChunks(t *testing.T) {
+ chunks := []*filer_pb.FileChunk{
+ {Offset: 10, Size: 100, FileId: "abc", Mtime: 50},
+ {Offset: 100, Size: 100, FileId: "def", Mtime: 100},
+ {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200},
+ {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300},
+ }
+
+ compacted, garbage := CompactFileChunks(nil, chunks)
+
+ if len(compacted) != 3 {
+ t.Fatalf("unexpected compacted: %d", len(compacted))
+ }
+ if len(garbage) != 1 {
+ t.Fatalf("unexpected garbage: %d", len(garbage))
+ }
+
+}
+
+func TestCompactFileChunks2(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 50},
+ {Offset: 100, Size: 100, FileId: "def", Mtime: 100},
+ {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200},
+ {Offset: 0, Size: 100, FileId: "abcf", Mtime: 300},
+ {Offset: 50, Size: 100, FileId: "fhfh", Mtime: 400},
+ {Offset: 100, Size: 100, FileId: "yuyu", Mtime: 500},
+ }
+
+ k := 3
+
+ for n := 0; n < k; n++ {
+ chunks = append(chunks, &filer_pb.FileChunk{
+ Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n),
+ })
+ chunks = append(chunks, &filer_pb.FileChunk{
+ Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k),
+ })
+ }
+
+ compacted, garbage := CompactFileChunks(nil, chunks)
+
+ if len(compacted) != 4 {
+ t.Fatalf("unexpected compacted: %d", len(compacted))
+ }
+ if len(garbage) != 8 {
+ t.Fatalf("unexpected garbage: %d", len(garbage))
+ }
+}
+
+func TestRandomFileChunksCompact(t *testing.T) {
+
+ data := make([]byte, 1024)
+
+ var chunks []*filer_pb.FileChunk
+ for i := 0; i < 15; i++ {
+ start, stop := rand.Intn(len(data)), rand.Intn(len(data))
+ if start > stop {
+ start, stop = stop, start
+ }
+ if start+16 < stop {
+ stop = start + 16
+ }
+ chunk := &filer_pb.FileChunk{
+ FileId: strconv.Itoa(i),
+ Offset: int64(start),
+ Size: uint64(stop - start),
+ Mtime: int64(i),
+ Fid: &filer_pb.FileId{FileKey: uint64(i)},
+ }
+ chunks = append(chunks, chunk)
+ for x := start; x < stop; x++ {
+ data[x] = byte(i)
+ }
+ }
+
+ visibles, _ := NonOverlappingVisibleIntervals(nil, chunks)
+
+ for _, v := range visibles {
+ for x := v.start; x < v.stop; x++ {
+ assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId)
+ }
+ }
+
+}
+
+func TestIntervalMerging(t *testing.T) {
+
+ testcases := []struct {
+ Chunks []*filer_pb.FileChunk
+ Expected []*VisibleInterval
+ }{
+ // case 0: normal
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
+ {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 100, fileId: "abc"},
+ {start: 100, stop: 200, fileId: "asdf"},
+ {start: 200, stop: 300, fileId: "fsad"},
+ },
+ },
+ // case 1: updates overwrite full chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 200, fileId: "asdf"},
+ },
+ },
+ // case 2: updates overwrite part of previous chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 0, Size: 70, FileId: "b", Mtime: 134},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 70, fileId: "b"},
+ {start: 70, stop: 100, fileId: "a", chunkOffset: 70},
+ },
+ },
+ // case 3: updates overwrite full chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
+ {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 50, fileId: "asdf"},
+ {start: 50, stop: 300, fileId: "xxxx"},
+ },
+ },
+ // case 4: updates far away from prev chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
+ {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 200, fileId: "asdf"},
+ {start: 250, stop: 500, fileId: "xxxx"},
+ },
+ },
+ // case 5: updates overwrite full chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "d", Mtime: 184},
+ {Offset: 70, Size: 150, FileId: "c", Mtime: 143},
+ {Offset: 80, Size: 100, FileId: "b", Mtime: 134},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 200, fileId: "d"},
+ {start: 200, stop: 220, fileId: "c", chunkOffset: 130},
+ },
+ },
+ // case 6: same updates
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123},
+ {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123},
+ {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 100, fileId: "xyz"},
+ },
+ },
+ // case 7: real updates
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 2097152, FileId: "7,0294cbb9892b", Mtime: 123},
+ {Offset: 0, Size: 3145728, FileId: "3,029565bf3092", Mtime: 130},
+ {Offset: 2097152, Size: 3145728, FileId: "6,029632f47ae2", Mtime: 140},
+ {Offset: 5242880, Size: 3145728, FileId: "2,029734c5aa10", Mtime: 150},
+ {Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160},
+ {Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 2097152, fileId: "3,029565bf3092"},
+ {start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"},
+ {start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"},
+ {start: 8388608, stop: 11534336, fileId: "5,02982f80de50"},
+ {start: 11534336, stop: 14376529, fileId: "7,0299ad723803"},
+ },
+ },
+ // case 8: real bug
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 77824, FileId: "4,0b3df938e301", Mtime: 123},
+ {Offset: 471040, Size: 472225 - 471040, FileId: "6,0b3e0650019c", Mtime: 130},
+ {Offset: 77824, Size: 208896 - 77824, FileId: "4,0b3f0c7202f0", Mtime: 140},
+ {Offset: 208896, Size: 339968 - 208896, FileId: "2,0b4031a72689", Mtime: 150},
+ {Offset: 339968, Size: 471040 - 339968, FileId: "3,0b416a557362", Mtime: 160},
+ },
+ Expected: []*VisibleInterval{
+ {start: 0, stop: 77824, fileId: "4,0b3df938e301"},
+ {start: 77824, stop: 208896, fileId: "4,0b3f0c7202f0"},
+ {start: 208896, stop: 339968, fileId: "2,0b4031a72689"},
+ {start: 339968, stop: 471040, fileId: "3,0b416a557362"},
+ {start: 471040, stop: 472225, fileId: "6,0b3e0650019c"},
+ },
+ },
+ }
+
+ for i, testcase := range testcases {
+ log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
+ intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks)
+ for x, interval := range intervals {
+ log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
+ i, x, interval.start, interval.stop, interval.fileId)
+ }
+ for x, interval := range intervals {
+ if interval.start != testcase.Expected[x].start {
+ t.Fatalf("failed on test case %d, interval %d, start %d, expect %d",
+ i, x, interval.start, testcase.Expected[x].start)
+ }
+ if interval.stop != testcase.Expected[x].stop {
+ t.Fatalf("failed on test case %d, interval %d, stop %d, expect %d",
+ i, x, interval.stop, testcase.Expected[x].stop)
+ }
+ if interval.fileId != testcase.Expected[x].fileId {
+ t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s",
+ i, x, interval.fileId, testcase.Expected[x].fileId)
+ }
+ if interval.chunkOffset != testcase.Expected[x].chunkOffset {
+ t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d",
+ i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset)
+ }
+ }
+ if len(intervals) != len(testcase.Expected) {
+ t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
+ }
+
+ }
+
+}
+
+func TestChunksReading(t *testing.T) {
+
+ testcases := []struct {
+ Chunks []*filer_pb.FileChunk
+ Offset int64
+ Size int64
+ Expected []*ChunkView
+ }{
+ // case 0: normal
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
+ {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
+ },
+ Offset: 0,
+ Size: 250,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
+ {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
+ {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200},
+ },
+ },
+ // case 1: updates overwrite full chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
+ },
+ Offset: 50,
+ Size: 100,
+ Expected: []*ChunkView{
+ {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50},
+ },
+ },
+ // case 2: updates overwrite part of previous chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 3, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 10, Size: 50, FileId: "b", Mtime: 134},
+ },
+ Offset: 30,
+ Size: 40,
+ Expected: []*ChunkView{
+ {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30},
+ {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60},
+ },
+ },
+ // case 3: updates overwrite full chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
+ {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154},
+ },
+ Offset: 0,
+ Size: 200,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0},
+ {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50},
+ },
+ },
+ // case 4: updates far away from prev chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
+ {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154},
+ },
+ Offset: 0,
+ Size: 400,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
+ {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250},
+ },
+ },
+ // case 5: updates overwrite full chunks
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "c", Mtime: 184},
+ {Offset: 70, Size: 150, FileId: "b", Mtime: 143},
+ {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134},
+ },
+ Offset: 0,
+ Size: 220,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0},
+ {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200},
+ },
+ },
+ // case 6: same updates
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123},
+ {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123},
+ {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123},
+ },
+ Offset: 0,
+ Size: 100,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0},
+ },
+ },
+ // case 7: edge cases
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
+ {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
+ },
+ Offset: 0,
+ Size: 200,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
+ {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
+ },
+ },
+ // case 8: edge cases
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
+ {Offset: 90, Size: 200, FileId: "asdf", Mtime: 134},
+ {Offset: 190, Size: 300, FileId: "fsad", Mtime: 353},
+ },
+ Offset: 0,
+ Size: 300,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0},
+ {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90},
+ {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190},
+ },
+ },
+ // case 9: edge cases
+ {
+ Chunks: []*filer_pb.FileChunk{
+ {Offset: 0, Size: 43175947, FileId: "2,111fc2cbfac1", Mtime: 1},
+ {Offset: 43175936, Size: 52981771 - 43175936, FileId: "2,112a36ea7f85", Mtime: 2},
+ {Offset: 52981760, Size: 72564747 - 52981760, FileId: "4,112d5f31c5e7", Mtime: 3},
+ {Offset: 72564736, Size: 133255179 - 72564736, FileId: "1,113245f0cdb6", Mtime: 4},
+ {Offset: 133255168, Size: 137269259 - 133255168, FileId: "3,1141a70733b5", Mtime: 5},
+ {Offset: 137269248, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", Mtime: 6},
+ },
+ Offset: 0,
+ Size: 153578836,
+ Expected: []*ChunkView{
+ {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0},
+ {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936},
+ {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760},
+ {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736},
+ {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168},
+ {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248},
+ },
+ },
+ }
+
+ for i, testcase := range testcases {
+ if i != 2 {
+ // continue
+ }
+ log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
+ chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
+ for x, chunk := range chunks {
+ log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
+ i, x, chunk.Offset, chunk.Size, chunk.FileId)
+ if chunk.Offset != testcase.Expected[x].Offset {
+ t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d",
+ i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset)
+ }
+ if chunk.Size != testcase.Expected[x].Size {
+ t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d",
+ i, chunk.FileId, chunk.Size, testcase.Expected[x].Size)
+ }
+ if chunk.FileId != testcase.Expected[x].FileId {
+ t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s",
+ i, x, chunk.FileId, testcase.Expected[x].FileId)
+ }
+ if chunk.LogicOffset != testcase.Expected[x].LogicOffset {
+ t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d",
+ i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset)
+ }
+ }
+ if len(chunks) != len(testcase.Expected) {
+ t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected))
+ }
+ }
+
+}
+
+func BenchmarkCompactFileChunks(b *testing.B) {
+
+ var chunks []*filer_pb.FileChunk
+
+ k := 1024
+
+ for n := 0; n < k; n++ {
+ chunks = append(chunks, &filer_pb.FileChunk{
+ Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n),
+ })
+ chunks = append(chunks, &filer_pb.FileChunk{
+ Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k),
+ })
+ }
+
+ for n := 0; n < b.N; n++ {
+ CompactFileChunks(nil, chunks)
+ }
+}
+
+func TestViewFromVisibleIntervals(t *testing.T) {
+ visibles := []VisibleInterval{
+ {
+ start: 0,
+ stop: 25,
+ fileId: "fid1",
+ },
+ {
+ start: 4096,
+ stop: 8192,
+ fileId: "fid2",
+ },
+ {
+ start: 16384,
+ stop: 18551,
+ fileId: "fid3",
+ },
+ }
+
+ views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
+
+ if len(views) != len(visibles) {
+ assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ }
+
+}
+
+func TestViewFromVisibleIntervals2(t *testing.T) {
+ visibles := []VisibleInterval{
+ {
+ start: 344064,
+ stop: 348160,
+ fileId: "fid1",
+ },
+ {
+ start: 348160,
+ stop: 356352,
+ fileId: "fid2",
+ },
+ }
+
+ views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
+
+ if len(views) != len(visibles) {
+ assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ }
+
+}
+
+func TestViewFromVisibleIntervals3(t *testing.T) {
+ visibles := []VisibleInterval{
+ {
+ start: 1000,
+ stop: 2000,
+ fileId: "fid1",
+ },
+ {
+ start: 3000,
+ stop: 4000,
+ fileId: "fid2",
+ },
+ }
+
+ views := ViewFromVisibleIntervals(visibles, 1700, 1500)
+
+ if len(views) != len(visibles) {
+ assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ }
+
+}
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
new file mode 100644
index 000000000..71da1a080
--- /dev/null
+++ b/weed/filer/filer.go
@@ -0,0 +1,290 @@
+package filer
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+const (
+ LogFlushInterval = time.Minute
+ PaginationSize = 1024 * 256
+)
+
+var (
+ OS_UID = uint32(os.Getuid())
+ OS_GID = uint32(os.Getgid())
+ ErrUnsupportedListDirectoryPrefixed = errors.New("UNSUPPORTED")
+)
+
+type Filer struct {
+ Store *FilerStoreWrapper
+ MasterClient *wdclient.MasterClient
+ fileIdDeletionQueue *util.UnboundedQueue
+ GrpcDialOption grpc.DialOption
+ DirBucketsPath string
+ FsyncBuckets []string
+ buckets *FilerBuckets
+ Cipher bool
+ LocalMetaLogBuffer *log_buffer.LogBuffer
+ metaLogCollection string
+ metaLogReplication string
+ MetaAggregator *MetaAggregator
+ Signature int32
+}
+
+func NewFiler(masters []string, grpcDialOption grpc.DialOption,
+ filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
+ f := &Filer{
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
+ fileIdDeletionQueue: util.NewUnboundedQueue(),
+ GrpcDialOption: grpcDialOption,
+ Signature: util.RandomInt32(),
+ }
+ f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
+ f.metaLogCollection = collection
+ f.metaLogReplication = replication
+
+ go f.loopProcessingDeletion()
+
+ return f
+}
+
+func (f *Filer) AggregateFromPeers(self string, filers []string) {
+
+ // set peers
+ if len(filers) == 0 {
+ filers = append(filers, self)
+ }
+ f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
+ f.MetaAggregator.StartLoopSubscribe(f, self)
+
+}
+
+func (f *Filer) SetStore(store FilerStore) {
+ f.Store = NewFilerStoreWrapper(store)
+}
+
+func (f *Filer) GetStore() (store FilerStore) {
+ return f.Store
+}
+
+func (fs *Filer) GetMaster() string {
+ return fs.MasterClient.GetMaster()
+}
+
+func (fs *Filer) KeepConnectedToMaster() {
+ fs.MasterClient.KeepConnectedToMaster()
+}
+
+func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return f.Store.BeginTransaction(ctx)
+}
+
+func (f *Filer) CommitTransaction(ctx context.Context) error {
+ return f.Store.CommitTransaction(ctx)
+}
+
+func (f *Filer) RollbackTransaction(ctx context.Context) error {
+ return f.Store.RollbackTransaction(ctx)
+}
+
+func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
+
+ if string(entry.FullPath) == "/" {
+ return nil
+ }
+
+ dirParts := strings.Split(string(entry.FullPath), "/")
+
+ // fmt.Printf("directory parts: %+v\n", dirParts)
+
+ var lastDirectoryEntry *Entry
+
+ for i := 1; i < len(dirParts); i++ {
+ dirPath := "/" + util.Join(dirParts[:i]...)
+ // fmt.Printf("%d directory: %+v\n", i, dirPath)
+
+ // check the store directly
+ glog.V(4).Infof("find uncached directory: %s", dirPath)
+ dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
+
+ // no such existing directory
+ if dirEntry == nil {
+
+ // create the directory
+ now := time.Now()
+
+ dirEntry = &Entry{
+ FullPath: util.FullPath(dirPath),
+ Attr: Attr{
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0110,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
+ },
+ }
+
+ glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
+ mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
+ if mkdirErr != nil {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
+ return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
+ }
+ } else {
+ f.maybeAddBucket(dirEntry)
+ f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
+ }
+
+ } else if !dirEntry.IsDirectory() {
+ glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
+ return fmt.Errorf("%s is a file", dirPath)
+ }
+
+ // remember the direct parent directory entry
+ if i == len(dirParts)-1 {
+ lastDirectoryEntry = dirEntry
+ }
+
+ }
+
+ if lastDirectoryEntry == nil {
+ glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
+ return fmt.Errorf("parent folder not found: %v", entry.FullPath)
+ }
+
+ /*
+ if !hasWritePermission(lastDirectoryEntry, entry) {
+ glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
+ lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
+ return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
+ }
+ */
+
+ oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
+
+ glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl)
+ if oldEntry == nil {
+ if err := f.Store.InsertEntry(ctx, entry); err != nil {
+ glog.Errorf("insert entry %s: %v", entry.FullPath, err)
+ return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
+ }
+ } else {
+ if o_excl {
+ glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
+ return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
+ }
+ if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
+ glog.Errorf("update entry %s: %v", entry.FullPath, err)
+ return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
+ }
+ }
+
+ f.maybeAddBucket(entry)
+ f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
+
+ f.deleteChunksIfNotNew(oldEntry, entry)
+
+ glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
+
+ return nil
+}
+
+func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
+ if oldEntry != nil {
+ if oldEntry.IsDirectory() && !entry.IsDirectory() {
+ glog.Errorf("existing %s is a directory", entry.FullPath)
+ return fmt.Errorf("existing %s is a directory", entry.FullPath)
+ }
+ if !oldEntry.IsDirectory() && entry.IsDirectory() {
+ glog.Errorf("existing %s is a file", entry.FullPath)
+ return fmt.Errorf("existing %s is a file", entry.FullPath)
+ }
+ }
+ return f.Store.UpdateEntry(ctx, entry)
+}
+
+func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
+
+ now := time.Now()
+
+ if string(p) == "/" {
+ return &Entry{
+ FullPath: p,
+ Attr: Attr{
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | 0755,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }, nil
+ }
+ entry, err = f.Store.FindEntry(ctx, p)
+ if entry != nil && entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
+ return nil, filer_pb.ErrNotFound
+ }
+ }
+ return
+
+}
+
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) {
+ if strings.HasSuffix(string(p), "/") && len(p) > 1 {
+ p = p[0 : len(p)-1]
+ }
+
+ var makeupEntries []*Entry
+ entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
+ for expiredCount > 0 && err == nil {
+ makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
+ if err == nil {
+ entries = append(entries, makeupEntries...)
+ }
+ }
+
+ return entries, err
+}
+
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) {
+ listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix)
+ if listErr != nil {
+ return listedEntries, expiredCount, "", listErr
+ }
+ for _, entry := range listedEntries {
+ lastFileName = entry.Name()
+ if entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
+ expiredCount++
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ return
+}
+
+func (f *Filer) Shutdown() {
+ f.LocalMetaLogBuffer.Shutdown()
+ f.Store.Shutdown()
+}
diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go
new file mode 100644
index 000000000..4d4f4abc3
--- /dev/null
+++ b/weed/filer/filer_buckets.go
@@ -0,0 +1,121 @@
+package filer
+
+import (
+ "context"
+ "math"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type BucketName string
+type BucketOption struct {
+ Name BucketName
+ Replication string
+ fsync bool
+}
+type FilerBuckets struct {
+ dirBucketsPath string
+ buckets map[BucketName]*BucketOption
+ sync.RWMutex
+}
+
+func (f *Filer) LoadBuckets() {
+
+ f.buckets = &FilerBuckets{
+ buckets: make(map[BucketName]*BucketOption),
+ }
+
+ limit := math.MaxInt32
+
+ entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "")
+
+ if err != nil {
+ glog.V(1).Infof("no buckets found: %v", err)
+ return
+ }
+
+ shouldFsyncMap := make(map[string]bool)
+ for _, bucket := range f.FsyncBuckets {
+ shouldFsyncMap[bucket] = true
+ }
+
+ glog.V(1).Infof("buckets found: %d", len(entries))
+
+ f.buckets.Lock()
+ for _, entry := range entries {
+ _, shouldFsnyc := shouldFsyncMap[entry.Name()]
+ f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
+ Name: BucketName(entry.Name()),
+ Replication: entry.Replication,
+ fsync: shouldFsnyc,
+ }
+ }
+ f.buckets.Unlock()
+
+}
+
+func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
+
+ f.buckets.RLock()
+ defer f.buckets.RUnlock()
+
+ option, found := f.buckets.buckets[BucketName(buketName)]
+
+ if !found {
+ return "", false
+ }
+ return option.Replication, option.fsync
+
+}
+
+func (f *Filer) isBucket(entry *Entry) bool {
+ if !entry.IsDirectory() {
+ return false
+ }
+ parent, dirName := entry.FullPath.DirAndName()
+ if parent != f.DirBucketsPath {
+ return false
+ }
+
+ f.buckets.RLock()
+ defer f.buckets.RUnlock()
+
+ _, found := f.buckets.buckets[BucketName(dirName)]
+
+ return found
+
+}
+
+func (f *Filer) maybeAddBucket(entry *Entry) {
+ if !entry.IsDirectory() {
+ return
+ }
+ parent, dirName := entry.FullPath.DirAndName()
+ if parent != f.DirBucketsPath {
+ return
+ }
+ f.addBucket(dirName, &BucketOption{
+ Name: BucketName(dirName),
+ Replication: entry.Replication,
+ })
+}
+
+func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {
+
+ f.buckets.Lock()
+ defer f.buckets.Unlock()
+
+ f.buckets.buckets[BucketName(buketName)] = bucketOption
+
+}
+
+func (f *Filer) deleteBucket(buketName string) {
+
+ f.buckets.Lock()
+ defer f.buckets.Unlock()
+
+ delete(f.buckets.buckets, BucketName(buketName))
+
+}
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
new file mode 100644
index 000000000..e2198bd21
--- /dev/null
+++ b/weed/filer/filer_delete_entry.go
@@ -0,0 +1,129 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
+ if p == "/" {
+ return nil
+ }
+
+ entry, findErr := f.FindEntry(ctx, p)
+ if findErr != nil {
+ return findErr
+ }
+
+ isCollection := f.isBucket(entry)
+
+ var chunks []*filer_pb.FileChunk
+ chunks = append(chunks, entry.Chunks...)
+ if entry.IsDirectory() {
+ // delete the folder children, not including the folder itself
+ var dirChunks []*filer_pb.FileChunk
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster)
+ if err != nil {
+ glog.V(0).Infof("delete directory %s: %v", p, err)
+ return fmt.Errorf("delete directory %s: %v", p, err)
+ }
+ chunks = append(chunks, dirChunks...)
+ }
+
+ // delete the file or folder
+ err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures)
+ if err != nil {
+ return fmt.Errorf("delete file %s: %v", p, err)
+ }
+
+ if shouldDeleteChunks && !isCollection {
+ go f.DeleteChunks(chunks)
+ }
+ if isCollection {
+ collectionName := entry.Name()
+ f.doDeleteCollection(collectionName)
+ f.deleteBucket(collectionName)
+ }
+
+ return nil
+}
+
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) {
+
+ lastFileName := ""
+ includeLastFile := false
+ for {
+ entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "")
+ if err != nil {
+ glog.Errorf("list folder %s: %v", entry.FullPath, err)
+ return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ }
+ if lastFileName == "" && !isRecursive && len(entries) > 0 {
+ // only for first iteration in the loop
+ glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
+ return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ }
+
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ var dirChunks []*filer_pb.FileChunk
+ if sub.IsDirectory() {
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
+ chunks = append(chunks, dirChunks...)
+ } else {
+ f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
+ chunks = append(chunks, sub.Chunks...)
+ }
+ if err != nil && !ignoreRecursiveError {
+ return nil, err
+ }
+ }
+
+ if len(entries) < PaginationSize {
+ break
+ }
+ }
+
+ glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
+
+ if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
+ return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+
+ f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil)
+
+ return chunks, nil
+}
+
+func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
+
+ glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
+
+ if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ if !entry.IsDirectory() {
+ f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
+ }
+
+ return nil
+}
+
+func (f *Filer) doDeleteCollection(collectionName string) (err error) {
+
+ return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ Name: collectionName,
+ })
+ if err != nil {
+ glog.Infof("delete collection %s: %v", collectionName, err)
+ }
+ return err
+ })
+
+}
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
new file mode 100644
index 000000000..126d162ec
--- /dev/null
+++ b/weed/filer/filer_deletion.go
@@ -0,0 +1,109 @@
+package filer
+
+import (
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error) {
+ return func(vids []string) (map[string]operation.LookupResult, error) {
+ m := make(map[string]operation.LookupResult)
+ for _, vid := range vids {
+ locs, _ := masterClient.GetVidLocations(vid)
+ var locations []operation.Location
+ for _, loc := range locs {
+ locations = append(locations, operation.Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ })
+ }
+ m[vid] = operation.LookupResult{
+ VolumeId: vid,
+ Locations: locations,
+ }
+ }
+ return m, nil
+ }
+}
+
+func (f *Filer) loopProcessingDeletion() {
+
+ lookupFunc := LookupByMasterClientFn(f.MasterClient)
+
+ DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
+
+ var deletionCount int
+ for {
+ deletionCount = 0
+ f.fileIdDeletionQueue.Consume(func(fileIds []string) {
+ for len(fileIds) > 0 {
+ var toDeleteFileIds []string
+ if len(fileIds) > DeletionBatchSize {
+ toDeleteFileIds = fileIds[:DeletionBatchSize]
+ fileIds = fileIds[DeletionBatchSize:]
+ } else {
+ toDeleteFileIds = fileIds
+ fileIds = fileIds[:0]
+ }
+ deletionCount = len(toDeleteFileIds)
+ _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
+ if err != nil {
+ if !strings.Contains(err.Error(), "already deleted") {
+ glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ }
+ } else {
+ glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
+ }
+ }
+ })
+
+ if deletionCount == 0 {
+ time.Sleep(1123 * time.Millisecond)
+ }
+ }
+}
+
+func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
+ for _, chunk := range chunks {
+ if !chunk.IsChunkManifest {
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ continue
+ }
+ dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
+ if manifestResolveErr != nil {
+ glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
+ }
+ for _, dChunk := range dataChunks {
+ f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
+ }
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ }
+}
+
+func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
+
+ if oldEntry == nil {
+ return
+ }
+ if newEntry == nil {
+ f.DeleteChunks(oldEntry.Chunks)
+ }
+
+ var toDelete []*filer_pb.FileChunk
+ newChunkIds := make(map[string]bool)
+ for _, newChunk := range newEntry.Chunks {
+ newChunkIds[newChunk.GetFileIdString()] = true
+ }
+
+ for _, oldChunk := range oldEntry.Chunks {
+ if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
+ toDelete = append(toDelete, oldChunk)
+ }
+ }
+ f.DeleteChunks(toDelete)
+}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
new file mode 100644
index 000000000..e00117382
--- /dev/null
+++ b/weed/filer/filer_notify.go
@@ -0,0 +1,169 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) {
+ var fullpath string
+ if oldEntry != nil {
+ fullpath = string(oldEntry.FullPath)
+ } else if newEntry != nil {
+ fullpath = string(newEntry.FullPath)
+ } else {
+ return
+ }
+
+ // println("fullpath:", fullpath)
+
+ if strings.HasPrefix(fullpath, SystemLogDir) {
+ return
+ }
+
+ newParentPath := ""
+ if newEntry != nil {
+ newParentPath, _ = newEntry.FullPath.DirAndName()
+ }
+ eventNotification := &filer_pb.EventNotification{
+ OldEntry: oldEntry.ToProtoEntry(),
+ NewEntry: newEntry.ToProtoEntry(),
+ DeleteChunks: deleteChunks,
+ NewParentPath: newParentPath,
+ IsFromOtherCluster: isFromOtherCluster,
+ Signatures: append(signatures, f.Signature),
+ }
+
+ if notification.Queue != nil {
+ glog.V(3).Infof("notifying entry update %v", fullpath)
+ notification.Queue.SendMessage(fullpath, eventNotification)
+ }
+
+ f.logMetaEvent(ctx, fullpath, eventNotification)
+
+}
+
+func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotification *filer_pb.EventNotification) {
+
+ dir, _ := util.FullPath(fullpath).DirAndName()
+
+ event := &filer_pb.SubscribeMetadataResponse{
+ Directory: dir,
+ EventNotification: eventNotification,
+ TsNs: time.Now().UnixNano(),
+ }
+ data, err := proto.Marshal(event)
+ if err != nil {
+ glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
+ return
+ }
+
+ f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
+
+}
+
+func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
+
+ startTime, stopTime = startTime.UTC(), stopTime.UTC()
+
+ targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ // startTime.Second(), startTime.Nanosecond(),
+ )
+
+ for {
+ if err := f.appendToFile(targetFile, buf); err != nil {
+ glog.V(1).Infof("log write failed %s: %v", targetFile, err)
+ time.Sleep(737 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+}
+
+func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
+
+ startTime = startTime.UTC()
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
+ startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+
+ sizeBuf := make([]byte, 4)
+ startTsNs := startTime.UnixNano()
+
+ dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "")
+ if listDayErr != nil {
+ return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
+ }
+ for _, dayEntry := range dayEntries {
+ // println("checking day", dayEntry.FullPath)
+ hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "")
+ if listHourMinuteErr != nil {
+ return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
+ }
+ for _, hourMinuteEntry := range hourMinuteEntries {
+ // println("checking hh-mm", hourMinuteEntry.FullPath)
+ if dayEntry.Name() == startDate {
+ if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
+ continue
+ }
+ }
+ // println("processing", hourMinuteEntry.FullPath)
+ chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks)
+ if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ chunkedFileReader.Close()
+ if err == io.EOF {
+ continue
+ }
+ return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
+ }
+ chunkedFileReader.Close()
+ }
+ }
+
+ return lastTsNs, nil
+}
+
+func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
+ for {
+ n, err := r.Read(sizeBuf)
+ if err != nil {
+ return lastTsNs, err
+ }
+ if n != 4 {
+ return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n)
+ }
+ size := util.BytesToUint32(sizeBuf)
+ // println("entry size", size)
+ entryData := make([]byte, size)
+ n, err = r.Read(entryData)
+ if err != nil {
+ return lastTsNs, err
+ }
+ if n != int(size) {
+ return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
+ }
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ return lastTsNs, err
+ }
+ if logEntry.TsNs <= ns {
+ return lastTsNs, nil
+ }
+ // println("each log: ", logEntry.TsNs)
+ if err := eachLogEntryFn(logEntry); err != nil {
+ return lastTsNs, err
+ } else {
+ lastTsNs = logEntry.TsNs
+ }
+ }
+}
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
new file mode 100644
index 000000000..b1836b046
--- /dev/null
+++ b/weed/filer/filer_notify_append.go
@@ -0,0 +1,73 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (f *Filer) appendToFile(targetFile string, data []byte) error {
+
+ assignResult, uploadResult, err2 := f.assignAndUpload(data)
+ if err2 != nil {
+ return err2
+ }
+
+ // find out existing entry
+ fullpath := util.FullPath(targetFile)
+ entry, err := f.FindEntry(context.Background(), fullpath)
+ var offset int64 = 0
+ if err == filer_pb.ErrNotFound {
+ entry = &Entry{
+ FullPath: fullpath,
+ Attr: Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+ } else {
+ offset = int64(TotalSize(entry.Chunks))
+ }
+
+ // append to existing chunks
+ entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
+
+ // update the entry
+ err = f.CreateEntry(context.Background(), entry, false, false, nil)
+
+ return err
+}
+
+func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+ // assign a volume location
+ assignRequest := &operation.VolumeAssignRequest{
+ Count: 1,
+ Collection: f.metaLogCollection,
+ Replication: f.metaLogReplication,
+ WritableVolumeCount: 1,
+ }
+ assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("AssignVolume: %v", err)
+ }
+ if assignResult.Error != "" {
+ return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
+ }
+
+ // upload data
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, uploadResult, nil
+}
diff --git a/weed/filer/filer_notify_test.go b/weed/filer/filer_notify_test.go
new file mode 100644
index 000000000..6a2be8f18
--- /dev/null
+++ b/weed/filer/filer_notify_test.go
@@ -0,0 +1,53 @@
+package filer
+
+import (
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
+ "github.com/golang/protobuf/proto"
+)
+
+func TestProtoMarshalText(t *testing.T) {
+
+ oldEntry := &Entry{
+ FullPath: util.FullPath("/this/path/to"),
+ Attr: Attr{
+ Mtime: time.Now(),
+ Mode: 0644,
+ Uid: 1,
+ Mime: "text/json",
+ TtlSec: 25,
+ },
+ Chunks: []*filer_pb.FileChunk{
+ &filer_pb.FileChunk{
+ FileId: "234,2423423422",
+ Offset: 234234,
+ Size: 234,
+ Mtime: 12312423,
+ ETag: "2342342354",
+ SourceFileId: "23234,2342342342",
+ },
+ },
+ }
+
+ notification := &filer_pb.EventNotification{
+ OldEntry: oldEntry.ToProtoEntry(),
+ NewEntry: nil,
+ DeleteChunks: true,
+ }
+
+ text := proto.MarshalTextString(notification)
+
+ notification2 := &filer_pb.EventNotification{}
+ proto.UnmarshalText(text, notification2)
+
+ if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId {
+ t.Fatalf("marshal/unmarshal error: %s", text)
+ }
+
+ println(text)
+
+}
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
new file mode 100644
index 000000000..48f7c99e4
--- /dev/null
+++ b/weed/filer/filerstore.go
@@ -0,0 +1,208 @@
+package filer
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type FilerStore interface {
+ // GetName gets the name to locate the configuration in filer.toml file
+ GetName() string
+ // Initialize initializes the file store
+ Initialize(configuration util.Configuration, prefix string) error
+ InsertEntry(context.Context, *Entry) error
+ UpdateEntry(context.Context, *Entry) (err error)
+ // err == filer2.ErrNotFound if not found
+ FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
+ DeleteEntry(context.Context, util.FullPath) (err error)
+ DeleteFolderChildren(context.Context, util.FullPath) (err error)
+ ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+ ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
+
+ BeginTransaction(ctx context.Context) (context.Context, error)
+ CommitTransaction(ctx context.Context) error
+ RollbackTransaction(ctx context.Context) error
+
+ Shutdown()
+}
+
+type FilerLocalStore interface {
+ UpdateOffset(filer string, lastTsNs int64) error
+ ReadOffset(filer string) (lastTsNs int64, err error)
+}
+
+type FilerStoreWrapper struct {
+ ActualStore FilerStore
+}
+
+func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
+ if innerStore, ok := store.(*FilerStoreWrapper); ok {
+ return innerStore
+ }
+ return &FilerStoreWrapper{
+ ActualStore: store,
+ }
+}
+
+func (fsw *FilerStoreWrapper) GetName() string {
+ return fsw.ActualStore.GetName()
+}
+
+func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
+ return fsw.ActualStore.Initialize(configuration, prefix)
+}
+
+func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ if entry.Mime == "application/octet-stream" {
+ entry.Mime = ""
+ }
+ return fsw.ActualStore.InsertEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ if entry.Mime == "application/octet-stream" {
+ entry.Mime = ""
+ }
+ return fsw.ActualStore.UpdateEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
+ }()
+
+ entry, err = fsw.ActualStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return
+}
+
+func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ return fsw.ActualStore.DeleteEntry(ctx, fp)
+}
+
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
+ }()
+
+ return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
+}
+
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
+ }()
+
+ entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, err
+}
+
+func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
+ }()
+ entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ if err == ErrUnsupportedListDirectoryPrefixed {
+ entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ }
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, nil
+}
+
+func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
+ entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+
+ if prefix == "" {
+ return
+ }
+
+ count := 0
+ var lastFileName string
+ notPrefixed := entries
+ entries = nil
+ for count < limit && len(notPrefixed) > 0 {
+ for _, entry := range notPrefixed {
+ lastFileName = entry.Name()
+ if strings.HasPrefix(entry.Name(), prefix) {
+ count++
+ entries = append(entries, entry)
+ if count >= limit {
+ break
+ }
+ }
+ }
+ if count < limit {
+ notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
+ if err != nil {
+ return
+ }
+ }
+ }
+ return
+}
+
+func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return fsw.ActualStore.BeginTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
+ return fsw.ActualStore.CommitTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
+ return fsw.ActualStore.RollbackTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) Shutdown() {
+ fsw.ActualStore.Shutdown()
+}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
new file mode 100644
index 000000000..eccb760a2
--- /dev/null
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -0,0 +1,231 @@
+package leveldb
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/syndtr/goleveldb/leveldb"
+ leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_FILE_SEPARATOR = byte(0x00)
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &LevelDBStore{})
+}
+
+type LevelDBStore struct {
+ db *leveldb.DB
+}
+
+func (store *LevelDBStore) GetName() string {
+ return "leveldb"
+}
+
+func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir)
+}
+
+func (store *LevelDBStore) initialize(dir string) (err error) {
+ glog.Infof("filer store dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10,
+ }
+
+ if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
+ if leveldb_errors.IsCorrupted(err) {
+ store.db, err = leveldb.RecoverFile(dir, opts)
+ }
+ if err != nil {
+ glog.Infof("filer store open dir %s: %v", dir, err)
+ return
+ }
+ }
+ return
+}
+
+func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *LevelDBStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ key := genKey(entry.DirAndName())
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.db.Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ key := genKey(fullpath.DirAndName())
+
+ data, err := store.db.Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ key := genKey(fullpath.DirAndName())
+
+ err = store.db.Delete(key, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+
+ batch := new(leveldb.Batch)
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete([]byte(genKey(string(fullpath), fileName)))
+ }
+ iter.Release()
+
+ err = store.db.Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer.Entry, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+ iter.Release()
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = []byte(dirPath)
+ key = append(key, DIR_FILE_SEPARATOR)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = []byte(string(fullpath))
+ keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ sepIndex := len(key) - 1
+ for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
+ sepIndex--
+ }
+
+ return string(key[sepIndex+1:])
+
+}
+
+func (store *LevelDBStore) Shutdown() {
+ store.db.Close()
+}
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
new file mode 100644
index 000000000..df196b02e
--- /dev/null
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -0,0 +1,88 @@
+package leveldb
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ filer := filer.NewFiler(nil, nil, "", 0, "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &LevelDBStore{}
+ store.initialize(dir)
+ filer.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := filer.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ filer := filer.NewFiler(nil, nil, "", 0, "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &LevelDBStore{}
+ store.initialize(dir)
+ filer.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
diff --git a/weed/filer/leveldb2/leveldb2_local_store.go b/weed/filer/leveldb2/leveldb2_local_store.go
new file mode 100644
index 000000000..faae25c45
--- /dev/null
+++ b/weed/filer/leveldb2/leveldb2_local_store.go
@@ -0,0 +1,43 @@
+package leveldb
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ _ = filer.FilerLocalStore(&LevelDB2Store{})
+)
+
+func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error {
+
+ value := make([]byte, 8)
+ util.Uint64toBytes(value, uint64(lastTsNs))
+
+ err := store.dbs[0].Put([]byte("meta"+filer), value, nil)
+
+ if err != nil {
+ return fmt.Errorf("UpdateOffset %s : %v", filer, err)
+ }
+
+ println("UpdateOffset", filer, "lastTsNs", lastTsNs)
+
+ return nil
+}
+
+func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) {
+
+ value, err := store.dbs[0].Get([]byte("meta"+filer), nil)
+
+ if err != nil {
+ return 0, fmt.Errorf("ReadOffset %s : %v", filer, err)
+ }
+
+ lastTsNs = int64(util.BytesToUint64(value))
+
+ println("ReadOffset", filer, "lastTsNs", lastTsNs)
+
+ return
+}
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
new file mode 100644
index 000000000..7a2bdac2e
--- /dev/null
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -0,0 +1,251 @@
+package leveldb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "github.com/syndtr/goleveldb/leveldb"
+ leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &LevelDB2Store{})
+}
+
+type LevelDB2Store struct {
+ dbs []*leveldb.DB
+ dbCount int
+}
+
+func (store *LevelDB2Store) GetName() string {
+ return "leveldb2"
+}
+
+func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir, 8)
+}
+
+func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
+ glog.Infof("filer store leveldb2 dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+
+ for d := 0; d < dbCount; d++ {
+ dbFolder := fmt.Sprintf("%s/%02d", dir, d)
+ os.MkdirAll(dbFolder, 0755)
+ db, dbErr := leveldb.OpenFile(dbFolder, opts)
+ if leveldb_errors.IsCorrupted(dbErr) {
+ db, dbErr = leveldb.RecoverFile(dbFolder, opts)
+ }
+ if dbErr != nil {
+ glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
+ return dbErr
+ }
+ store.dbs = append(store.dbs, db)
+ }
+ store.dbCount = dbCount
+
+ return
+}
+
+func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.DirAndName()
+ key, partitionId := genKey(dir, name, store.dbCount)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.dbs[partitionId].Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ key, partitionId := genKey(dir, name, store.dbCount)
+
+ data, err := store.dbs[partitionId].Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ key, partitionId := genKey(dir, name, store.dbCount)
+
+ err = store.dbs[partitionId].Delete(key, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
+
+ batch := new(leveldb.Batch)
+
+ iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete(append(directoryPrefix, []byte(fileName)...))
+ }
+ iter.Release()
+
+ err = store.dbs[partitionId].Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer.Entry, err error) {
+
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
+ lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
+
+ iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ }
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+
+ if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+ iter.Release()
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) {
+ key, partitionId = hashToBytes(dirPath, dbCount)
+ key = append(key, []byte(fileName)...)
+ return key, partitionId
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
+ keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount)
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix, partitionId
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory, and use last byte for partitioning
+func hashToBytes(dir string, dbCount int) ([]byte, int) {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ x := b[len(b)-1]
+
+ return b, int(x) % dbCount
+}
+
+func (store *LevelDB2Store) Shutdown() {
+ for d := 0; d < store.dbCount; d++ {
+ store.dbs[d].Close()
+ }
+}
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
new file mode 100644
index 000000000..191de0040
--- /dev/null
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -0,0 +1,88 @@
+package leveldb
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ filer := filer.NewFiler(nil, nil, "", 0, "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &LevelDB2Store{}
+ store.initialize(dir, 2)
+ filer.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := filer.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ filer := filer.NewFiler(nil, nil, "", 0, "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &LevelDB2Store{}
+ store.initialize(dir, 2)
+ filer.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
new file mode 100644
index 000000000..506f03e4c
--- /dev/null
+++ b/weed/filer/meta_aggregator.go
@@ -0,0 +1,131 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+type MetaAggregator struct {
+ filers []string
+ grpcDialOption grpc.DialOption
+ MetaLogBuffer *log_buffer.LogBuffer
+ // notifying clients
+ ListenersLock sync.Mutex
+ ListenersCond *sync.Cond
+}
+
+// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
+// The old data comes from what each LocalMetadata persisted on disk.
+func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
+ t := &MetaAggregator{
+ filers: filers,
+ grpcDialOption: grpcDialOption,
+ }
+ t.ListenersCond = sync.NewCond(&t.ListenersLock)
+ t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
+ t.ListenersCond.Broadcast()
+ })
+ return t
+}
+
+func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
+ for _, filer := range ma.filers {
+ go ma.subscribeToOneFiler(f, self, filer)
+ }
+}
+
+func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
+
+ var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
+ lastPersistTime := time.Now()
+ changesSinceLastPersist := 0
+ lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
+
+ MaxChangeLimit := 100
+
+ if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
+ if self != filer {
+
+ if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
+ lastTsNs = prevTsNs
+ }
+
+ glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
+ maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
+ if err := Replay(f.Store.ActualStore, event); err != nil {
+ glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
+ return
+ }
+ changesSinceLastPersist++
+ if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
+ if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
+ lastPersistTime = time.Now()
+ changesSinceLastPersist = 0
+ } else {
+ glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
+ }
+ }
+ }
+ } else {
+ glog.V(0).Infof("skipping following self: %v", self)
+ }
+ }
+
+ processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
+ data, err := proto.Marshal(event)
+ if err != nil {
+ glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
+ return err
+ }
+ dir := event.Directory
+ // println("received meta change", dir, "size", len(data))
+ ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
+ if maybeReplicateMetadataChange != nil {
+ maybeReplicateMetadataChange(event)
+ }
+ return nil
+ }
+
+ for {
+ err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ClientName: "filer:" + self,
+ PathPrefix: "/",
+ SinceNs: lastTsNs,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
+ }
+ })
+ if err != nil {
+ glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
+ time.Sleep(1733 * time.Millisecond)
+ }
+ }
+}
diff --git a/weed/filer/meta_replay.go b/weed/filer/meta_replay.go
new file mode 100644
index 000000000..feb76278b
--- /dev/null
+++ b/weed/filer/meta_replay.go
@@ -0,0 +1,37 @@
+package filer
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ var oldPath util.FullPath
+ var newEntry *Entry
+ if message.OldEntry != nil {
+ oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", oldPath)
+ if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil {
+ return err
+ }
+ }
+
+ if message.NewEntry != nil {
+ dir := resp.Directory
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ newEntry = FromPbEntry(dir, message.NewEntry)
+ if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
new file mode 100644
index 000000000..57d9a031d
--- /dev/null
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -0,0 +1,214 @@
+package mongodb
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "go.mongodb.org/mongo-driver/x/bsonx"
+ "time"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &MongodbStore{})
+}
+
+type MongodbStore struct {
+ connect *mongo.Client
+ database string
+ collectionName string
+}
+
+type Model struct {
+ Directory string `bson:"directory"`
+ Name string `bson:"name"`
+ Meta []byte `bson:"meta"`
+}
+
+func (store *MongodbStore) GetName() string {
+ return "mongodb"
+}
+
+func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ store.database = configuration.GetString(prefix + "database")
+ store.collectionName = "filemeta"
+ poolSize := configuration.GetInt(prefix + "option_pool_size")
+ return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize))
+}
+
+func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+ opts := options.Client().ApplyURI(uri)
+
+ if poolSize > 0 {
+ opts.SetMaxPoolSize(poolSize)
+ }
+
+ client, err := mongo.Connect(ctx, opts)
+ if err != nil {
+ return err
+ }
+
+ c := client.Database(store.database).Collection(store.collectionName)
+ err = store.indexUnique(c)
+ store.connect = client
+ return err
+}
+
+func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error {
+ _, err := c.Indexes().CreateOne(context.Background(), index, opts)
+ return err
+}
+
+func (store *MongodbStore) indexUnique(c *mongo.Collection) error {
+ opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
+
+ unique := new(bool)
+ *unique = true
+
+ index := mongo.IndexModel{
+ Keys: bsonx.Doc{{Key: "directory", Value: bsonx.Int32(1)}, {Key: "name", Value: bsonx.Int32(1)}},
+ Options: &options.IndexOptions{
+ Unique: unique,
+ },
+ }
+
+ return store.createIndex(c, index, opts)
+}
+
+func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+
+func (store *MongodbStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *MongodbStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ c := store.connect.Database(store.database).Collection(store.collectionName)
+
+ _, err = c.InsertOne(ctx, Model{
+ Directory: dir,
+ Name: name,
+ Meta: meta,
+ })
+
+ return nil
+}
+
+func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ dir, name := fullpath.DirAndName()
+ var data Model
+
+ var where = bson.M{"directory": dir, "name": name}
+ err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
+ if err != mongo.ErrNoDocuments && err != nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if len(data.Meta) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+
+ err = entry.DecodeAttributesAndChunks(data.Meta)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
+
+ dir, name := fullpath.DirAndName()
+
+ where := bson.M{"directory": dir, "name": name}
+ _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+
+ where := bson.M{"directory": fullpath}
+ _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+
+ var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}}
+ if inclusive {
+ where["name"] = bson.M{
+ "$gte": startFileName,
+ }
+ }
+ optLimit := int64(limit)
+ opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}}
+ cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts)
+ for cur.Next(ctx) {
+ var data Model
+ err := cur.Decode(&data)
+ if err != nil && err != mongo.ErrNoDocuments {
+ return nil, err
+ }
+
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(fullpath), data.Name),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+
+ entries = append(entries, entry)
+ }
+
+ if err := cur.Close(ctx); err != nil {
+ glog.V(0).Infof("list iterator close: %v", err)
+ }
+
+ return entries, err
+}
+
+func (store *MongodbStore) Shutdown() {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+ store.connect.Disconnect(ctx)
+}
diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go
new file mode 100644
index 000000000..708a67cc3
--- /dev/null
+++ b/weed/filer/mysql/mysql_store.go
@@ -0,0 +1,74 @@
+package mysql
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ _ "github.com/go-sql-driver/mysql"
+)
+
+const (
+ CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &MysqlStore{})
+}
+
+type MysqlStore struct {
+ abstract_sql.AbstractSqlStore
+}
+
+func (store *MysqlStore) GetName() string {
+ return "mysql"
+}
+
+func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetBool(prefix+"interpolateParams"),
+ )
+}
+
+func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
+ interpolateParams bool) (err error) {
+ //
+ store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
+ store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
+ store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
+ store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
+ store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
+ store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
+
+ sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ if interpolateParams {
+ sqlUrl += "&interpolateParams=true"
+ }
+
+ var dbErr error
+ store.DB, dbErr = sql.Open("mysql", sqlUrl)
+ if dbErr != nil {
+ store.DB.Close()
+ store.DB = nil
+ return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ }
+
+ store.DB.SetMaxIdleConns(maxIdle)
+ store.DB.SetMaxOpenConns(maxOpen)
+
+ if err = store.DB.Ping(); err != nil {
+ return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/permission.go b/weed/filer/permission.go
new file mode 100644
index 000000000..0d8b8292b
--- /dev/null
+++ b/weed/filer/permission.go
@@ -0,0 +1,22 @@
+package filer
+
+func hasWritePermission(dir *Entry, entry *Entry) bool {
+
+ if dir == nil {
+ return false
+ }
+
+ if dir.Uid == entry.Uid && dir.Mode&0200 > 0 {
+ return true
+ }
+
+ if dir.Gid == entry.Gid && dir.Mode&0020 > 0 {
+ return true
+ }
+
+ if dir.Mode&0002 > 0 {
+ return true
+ }
+
+ return false
+}
diff --git a/weed/filer/postgres/README.txt b/weed/filer/postgres/README.txt
new file mode 100644
index 000000000..cb0c99c63
--- /dev/null
+++ b/weed/filer/postgres/README.txt
@@ -0,0 +1,17 @@
+
+1. create "seaweedfs" database
+
+export PGHOME=/Library/PostgreSQL/10
+$PGHOME/bin/createdb --username=postgres --password seaweedfs
+
+2. create "filemeta" table
+$PGHOME/bin/psql --username=postgres --password seaweedfs
+
+CREATE TABLE IF NOT EXISTS filemeta (
+ dirhash BIGINT,
+ name VARCHAR(65535),
+ directory VARCHAR(65535),
+ meta bytea,
+ PRIMARY KEY (dirhash, name)
+);
+
diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go
new file mode 100644
index 000000000..4544c8416
--- /dev/null
+++ b/weed/filer/postgres/postgres_store.go
@@ -0,0 +1,75 @@
+package postgres
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ _ "github.com/lib/pq"
+)
+
+const (
+ CONNECTION_URL_PATTERN = "host=%s port=%d user=%s sslmode=%s connect_timeout=30"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &PostgresStore{})
+}
+
+type PostgresStore struct {
+ abstract_sql.AbstractSqlStore
+}
+
+func (store *PostgresStore) GetName() string {
+ return "postgres"
+}
+
+func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetString(prefix+"sslmode"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
+ )
+}
+
+func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) {
+
+ store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"
+ store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
+ store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
+ store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
+ store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like CONCAT($4,'%')ORDER BY NAME ASC LIMIT $5"
+ store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like CONCAT($4,'%') ORDER BY NAME ASC LIMIT $5"
+
+ sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode)
+ if password != "" {
+ sqlUrl += " password=" + password
+ }
+ if database != "" {
+ sqlUrl += " dbname=" + database
+ }
+ var dbErr error
+ store.DB, dbErr = sql.Open("postgres", sqlUrl)
+ if dbErr != nil {
+ store.DB.Close()
+ store.DB = nil
+ return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ }
+
+ store.DB.SetMaxIdleConns(maxIdle)
+ store.DB.SetMaxOpenConns(maxOpen)
+
+ if err = store.DB.Ping(); err != nil {
+ return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
new file mode 100644
index 000000000..9f338782e
--- /dev/null
+++ b/weed/filer/reader_at.go
@@ -0,0 +1,149 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+type ChunkReadAt struct {
+ masterClient *wdclient.MasterClient
+ chunkViews []*ChunkView
+ lookupFileId func(fileId string) (targetUrl string, err error)
+ readerLock sync.Mutex
+ fileSize int64
+
+ chunkCache chunk_cache.ChunkCache
+}
+
+// var _ = io.ReaderAt(&ChunkReadAt{})
+
+type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
+
+func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
+ return func(fileId string) (targetUrl string, err error) {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ vid := VolumeId(fileId)
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return err
+ }
+
+ locations := resp.LocationsMap[vid]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", fileId)
+ return fmt.Errorf("failed to locate %s", fileId)
+ }
+
+ volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
+
+ targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
+
+ return nil
+ })
+ return
+ }
+}
+
+func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
+
+ return &ChunkReadAt{
+ chunkViews: chunkViews,
+ lookupFileId: LookupFn(filerClient),
+ chunkCache: chunkCache,
+ fileSize: fileSize,
+ }
+}
+
+func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
+
+ c.readerLock.Lock()
+ defer c.readerLock.Unlock()
+
+ glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
+ return c.doReadAt(p[n:], offset+int64(n))
+}
+
+func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
+
+ var buffer []byte
+ startOffset, remaining := offset, int64(len(p))
+ for i, chunk := range c.chunkViews {
+ if remaining <= 0 {
+ break
+ }
+ if startOffset < chunk.LogicOffset {
+ gap := int(chunk.LogicOffset - startOffset)
+ glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
+ n += int(min(int64(gap), remaining))
+ startOffset, remaining = chunk.LogicOffset, remaining-int64(gap)
+ if remaining <= 0 {
+ break
+ }
+ }
+ // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
+ chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
+ if chunkStart >= chunkStop {
+ continue
+ }
+ glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
+ buffer, err = c.readFromWholeChunkData(chunk)
+ if err != nil {
+ glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
+ return
+ }
+ bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
+ copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart])
+ n += copied
+ startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
+ }
+
+ glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
+
+ if err == nil && remaining > 0 && c.fileSize > startOffset {
+ delta := int(min(remaining, c.fileSize-startOffset))
+ glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize)
+ n += delta
+ }
+
+ if err == nil && offset+int64(len(p)) > c.fileSize {
+ err = io.EOF
+ }
+ // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
+
+ return
+
+}
+
+func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) {
+
+ glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
+
+ chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ if chunkData != nil {
+ glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData)))
+ } else {
+ glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId)
+ chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
+ if err != nil {
+ return
+ }
+ c.chunkCache.SetChunk(chunkView.FileId, chunkData)
+ }
+
+ return
+}
+
+func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+
+ return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped)
+
+}
diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go
new file mode 100644
index 000000000..d4a34cbfe
--- /dev/null
+++ b/weed/filer/reader_at_test.go
@@ -0,0 +1,156 @@
+package filer
+
+import (
+ "fmt"
+ "io"
+ "math"
+ "strconv"
+ "sync"
+ "testing"
+)
+
+type mockChunkCache struct {
+}
+
+func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
+ x, _ := strconv.Atoi(fileId)
+ data = make([]byte, minSize)
+ for i := 0; i < int(minSize); i++ {
+ data[i] = byte(x)
+ }
+ return data
+}
+func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
+}
+
+func TestReaderAt(t *testing.T) {
+
+ visibles := []VisibleInterval{
+ {
+ start: 1,
+ stop: 2,
+ fileId: "1",
+ chunkSize: 9,
+ },
+ {
+ start: 3,
+ stop: 4,
+ fileId: "3",
+ chunkSize: 1,
+ },
+ {
+ start: 5,
+ stop: 6,
+ fileId: "5",
+ chunkSize: 2,
+ },
+ {
+ start: 7,
+ stop: 9,
+ fileId: "7",
+ chunkSize: 2,
+ },
+ {
+ start: 9,
+ stop: 10,
+ fileId: "9",
+ chunkSize: 2,
+ },
+ }
+
+ readerAt := &ChunkReadAt{
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ chunkCache: &mockChunkCache{},
+ }
+
+ testReadAt(t, readerAt, 0, 10, 10, nil)
+ testReadAt(t, readerAt, 0, 12, 10, io.EOF)
+ testReadAt(t, readerAt, 2, 8, 8, nil)
+ testReadAt(t, readerAt, 3, 6, 6, nil)
+
+}
+
+func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) {
+ data := make([]byte, size)
+ n, err := readerAt.ReadAt(data, offset)
+
+ for _, d := range data {
+ fmt.Printf("%x", d)
+ }
+ fmt.Println()
+
+ if expected != n {
+ t.Errorf("unexpected read size: %d, expect: %d", n, expected)
+ }
+ if err != expectedErr {
+ t.Errorf("unexpected read error: %v, expect: %v", err, expectedErr)
+ }
+
+}
+
+func TestReaderAt0(t *testing.T) {
+
+ visibles := []VisibleInterval{
+ {
+ start: 2,
+ stop: 5,
+ fileId: "1",
+ chunkSize: 9,
+ },
+ {
+ start: 7,
+ stop: 9,
+ fileId: "2",
+ chunkSize: 9,
+ },
+ }
+
+ readerAt := &ChunkReadAt{
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ chunkCache: &mockChunkCache{},
+ }
+
+ testReadAt(t, readerAt, 0, 10, 10, nil)
+ testReadAt(t, readerAt, 3, 16, 7, io.EOF)
+ testReadAt(t, readerAt, 3, 5, 5, nil)
+
+ testReadAt(t, readerAt, 11, 5, 0, io.EOF)
+ testReadAt(t, readerAt, 10, 5, 0, io.EOF)
+
+}
+
+func TestReaderAt1(t *testing.T) {
+
+ visibles := []VisibleInterval{
+ {
+ start: 2,
+ stop: 5,
+ fileId: "1",
+ chunkSize: 9,
+ },
+ }
+
+ readerAt := &ChunkReadAt{
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 20,
+ chunkCache: &mockChunkCache{},
+ }
+
+ testReadAt(t, readerAt, 0, 20, 20, nil)
+ testReadAt(t, readerAt, 1, 7, 7, nil)
+ testReadAt(t, readerAt, 0, 1, 1, nil)
+ testReadAt(t, readerAt, 18, 4, 2, io.EOF)
+ testReadAt(t, readerAt, 12, 4, 4, nil)
+ testReadAt(t, readerAt, 4, 20, 16, io.EOF)
+ testReadAt(t, readerAt, 4, 10, 10, nil)
+ testReadAt(t, readerAt, 1, 10, 10, nil)
+
+}
diff --git a/weed/filer/redis/redis_cluster_store.go b/weed/filer/redis/redis_cluster_store.go
new file mode 100644
index 000000000..8af94ee55
--- /dev/null
+++ b/weed/filer/redis/redis_cluster_store.go
@@ -0,0 +1,42 @@
+package redis
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisClusterStore{})
+}
+
+type RedisClusterStore struct {
+ UniversalRedisStore
+}
+
+func (store *RedisClusterStore) GetName() string {
+ return "redis_cluster"
+}
+
+func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", true)
+ configuration.SetDefault(prefix+"routeByLatency", true)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ )
+}
+
+func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ return
+}
diff --git a/weed/filer/redis/redis_store.go b/weed/filer/redis/redis_store.go
new file mode 100644
index 000000000..e152457ed
--- /dev/null
+++ b/weed/filer/redis/redis_store.go
@@ -0,0 +1,36 @@
+package redis
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisStore{})
+}
+
+type RedisStore struct {
+ UniversalRedisStore
+}
+
+func (store *RedisStore) GetName() string {
+ return "redis"
+}
+
+func (store *RedisStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *RedisStore) initialize(hostPort string, password string, database int) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ return
+}
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
new file mode 100644
index 000000000..cc8819019
--- /dev/null
+++ b/weed/filer/redis/universal_redis_store.go
@@ -0,0 +1,191 @@
+package redis
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "strings"
+ "time"
+
+ "github.com/go-redis/redis"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedisStore struct {
+ Client redis.UniversalClient
+}
+
+func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+ if name != "" {
+ _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result()
+ if err != nil {
+ return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ data, err := store.Client.Get(string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks([]byte(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ _, err = store.Client.Del(string(fullpath)).Result()
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ dir, name := fullpath.DirAndName()
+ if name != "" {
+ _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ if err != nil {
+ return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ }
+
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ _, err = store.Client.Del(string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer.Entry, err error) {
+
+ dirListKey := genDirectoryListKey(string(fullpath))
+ members, err := store.Client.SMembers(dirListKey).Result()
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+
+ // skip
+ if startFileName != "" {
+ var t []string
+ for _, m := range members {
+ if strings.Compare(m, startFileName) >= 0 {
+ if m == startFileName {
+ if inclusive {
+ t = append(t, m)
+ }
+ } else {
+ t = append(t, m)
+ }
+ }
+ }
+ members = t
+ }
+
+ // sort
+ sort.Slice(members, func(i, j int) bool {
+ return strings.Compare(members[i], members[j]) < 0
+ })
+
+ // limit
+ if limit < len(members) {
+ members = members[:limit]
+ }
+
+ // fetch entry meta
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(string(path)).Result()
+ store.Client.SRem(dirListKey, fileName).Result()
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ }
+
+ return entries, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedisStore) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go
new file mode 100644
index 000000000..d155dbe88
--- /dev/null
+++ b/weed/filer/redis2/redis_cluster_store.go
@@ -0,0 +1,42 @@
+package redis2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisCluster2Store{})
+}
+
+type RedisCluster2Store struct {
+ UniversalRedis2Store
+}
+
+func (store *RedisCluster2Store) GetName() string {
+ return "redis_cluster2"
+}
+
+func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", true)
+ configuration.SetDefault(prefix+"routeByLatency", true)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ )
+}
+
+func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ return
+}
diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go
new file mode 100644
index 000000000..ed04c817b
--- /dev/null
+++ b/weed/filer/redis2/redis_store.go
@@ -0,0 +1,36 @@
+package redis2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &Redis2Store{})
+}
+
+type Redis2Store struct {
+ UniversalRedis2Store
+}
+
+func (store *Redis2Store) GetName() string {
+ return "redis2"
+}
+
+func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ return
+}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
new file mode 100644
index 000000000..9e06ff68f
--- /dev/null
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -0,0 +1,166 @@
+package redis2
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-redis/redis"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedis2Store struct {
+ Client redis.UniversalClient
+}
+
+func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+ if name != "" {
+ if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
+ return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ data, err := store.Client.Get(string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks([]byte(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ _, err = store.Client.Del(string(fullpath)).Result()
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ dir, name := fullpath.DirAndName()
+ if name != "" {
+ _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
+ if err != nil {
+ return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ }
+
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ _, err = store.Client.Del(string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer.Entry, err error) {
+
+ dirListKey := genDirectoryListKey(string(fullpath))
+ start := int64(0)
+ if startFileName != "" {
+ start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
+ if !inclusive {
+ start++
+ }
+ }
+ members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+
+ // fetch entry meta
+ for _, fileName := range members {
+ path := util.NewFullPath(string(fullpath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(string(path)).Result()
+ store.Client.ZRem(dirListKey, fileName).Result()
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ }
+
+ return entries, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedis2Store) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
new file mode 100644
index 000000000..416359ebf
--- /dev/null
+++ b/weed/filer/stream.go
@@ -0,0 +1,204 @@
+package filer
+
+import (
+ "bytes"
+ "io"
+ "math"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
+
+ // fmt.Printf("start to stream content for chunks: %+v\n", chunks)
+ chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
+
+ fileId2Url := make(map[string]string)
+
+ for _, chunkView := range chunkViews {
+
+ urlString, err := masterClient.LookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ fileId2Url[chunkView.FileId] = urlString
+ }
+
+ for _, chunkView := range chunkViews {
+
+ urlString := fileId2Url[chunkView.FileId]
+ err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ w.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ }
+
+ return nil
+
+}
+
+// ---------------- ReadAllReader ----------------------------------
+
+func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
+
+ buffer := bytes.Buffer{}
+
+ lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ return masterClient.LookupFileId(fileId)
+ }
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+
+ for _, chunkView := range chunkViews {
+ urlString, err := lookupFileIdFn(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return nil, err
+ }
+ err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return nil, err
+ }
+ }
+ return buffer.Bytes(), nil
+}
+
+// ---------------- ChunkStreamReader ----------------------------------
+type ChunkStreamReader struct {
+ chunkViews []*ChunkView
+ logicOffset int64
+ buffer []byte
+ bufferOffset int64
+ bufferPos int
+ chunkIndex int
+ lookupFileId LookupFileIdFunctionType
+}
+
+var _ = io.ReadSeeker(&ChunkStreamReader{})
+
+func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ return masterClient.LookupFileId(fileId)
+ }
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+
+ return &ChunkStreamReader{
+ chunkViews: chunkViews,
+ lookupFileId: lookupFileIdFn,
+ }
+}
+
+func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ lookupFileIdFn := LookupFn(filerClient)
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+
+ return &ChunkStreamReader{
+ chunkViews: chunkViews,
+ lookupFileId: lookupFileIdFn,
+ }
+}
+
+func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
+ for n < len(p) {
+ if c.isBufferEmpty() {
+ if c.chunkIndex >= len(c.chunkViews) {
+ return n, io.EOF
+ }
+ chunkView := c.chunkViews[c.chunkIndex]
+ c.fetchChunkToBuffer(chunkView)
+ c.chunkIndex++
+ }
+ t := copy(p[n:], c.buffer[c.bufferPos:])
+ c.bufferPos += t
+ n += t
+ }
+ return
+}
+
+func (c *ChunkStreamReader) isBufferEmpty() bool {
+ return len(c.buffer) <= c.bufferPos
+}
+
+func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
+
+ var totalSize int64
+ for _, chunk := range c.chunkViews {
+ totalSize += int64(chunk.Size)
+ }
+
+ var err error
+ switch whence {
+ case io.SeekStart:
+ case io.SeekCurrent:
+ offset += c.bufferOffset + int64(c.bufferPos)
+ case io.SeekEnd:
+ offset = totalSize + offset
+ }
+ if offset > totalSize {
+ err = io.ErrUnexpectedEOF
+ }
+
+ for i, chunk := range c.chunkViews {
+ if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
+ if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
+ c.fetchChunkToBuffer(chunk)
+ c.chunkIndex = i + 1
+ break
+ }
+ }
+ }
+ c.bufferPos = int(offset - c.bufferOffset)
+
+ return offset, err
+
+}
+
+func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
+ urlString, err := c.lookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ var buffer bytes.Buffer
+ err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ c.buffer = buffer.Bytes()
+ c.bufferPos = 0
+ c.bufferOffset = chunkView.LogicOffset
+
+ // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+
+ return nil
+}
+
+func (c *ChunkStreamReader) Close() {
+ // TODO try to release and reuse buffer
+}
+
+func VolumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}
diff --git a/weed/filer/topics.go b/weed/filer/topics.go
new file mode 100644
index 000000000..3a2fde8c4
--- /dev/null
+++ b/weed/filer/topics.go
@@ -0,0 +1,6 @@
+package filer
+
+const (
+ TopicsDir = "/topics"
+ SystemLogDir = TopicsDir + "/.system/log"
+)