diff options
Diffstat (limited to 'weed/filer')
42 files changed, 5453 insertions, 0 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go new file mode 100644 index 000000000..a6de2ea39 --- /dev/null +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -0,0 +1,206 @@ +package abstract_sql + +import ( + "context" + "database/sql" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type AbstractSqlStore struct { + DB *sql.DB + SqlInsert string + SqlUpdate string + SqlFind string + SqlDelete string + SqlDeleteFolderChildren string + SqlListExclusive string + SqlListInclusive string +} + +type TxOrDB interface { + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) +} + +func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) { + tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + ReadOnly: false, + }) + if err != nil { + return ctx, err + } + + return context.WithValue(ctx, "tx", tx), nil +} +func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Commit() + } + return nil +} +func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Rollback() + } + return nil +} + +func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx + } + return store.DB +} + +func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta) + if err != nil { + return fmt.Errorf("insert %s: %s", entry.FullPath, err) + } + + affectedRows, err := res.RowsAffected() + if err == nil && affectedRows > 0 { + return nil + } + + // now the insert failed possibly due to duplication constraints + glog.V(1).Infof("insert %s falls back to update: %s", entry.FullPath, err) + + res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) + if err != nil { + return fmt.Errorf("upsert %s: %s", entry.FullPath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err) + } + return nil + +} + +func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) + if err != nil { + return fmt.Errorf("update %s: %s", entry.FullPath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err) + } + return nil +} + +func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { + + dir, name := fullpath.DirAndName() + row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir) + var data []byte + if err := row.Scan(&data); err != nil { + return nil, filer_pb.ErrNotFound + } + + entry := &filer.Entry{ + FullPath: fullpath, + } + if err := entry.DecodeAttributesAndChunks(data); err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { + + dir, name := fullpath.DirAndName() + + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir) + if err != nil { + return fmt.Errorf("delete %s: %s", fullpath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) + } + + return nil +} + +func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath) + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) + } + + return nil +} + +func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + sqlText := store.SqlListExclusive + if inclusive { + sqlText = store.SqlListInclusive + } + + rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit) + if err != nil { + return nil, fmt.Errorf("list %s : %v", fullpath, err) + } + defer rows.Close() + + for rows.Next() { + var name string + var data []byte + if err = rows.Scan(&name, &data); err != nil { + glog.V(0).Infof("scan %s : %v", fullpath, err) + return nil, fmt.Errorf("scan %s: %v", fullpath, err) + } + + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(fullpath), name), + } + if err = entry.DecodeAttributesAndChunks(data); err != nil { + glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) + return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + } + + entries = append(entries, entry) + } + + return entries, nil +} + +func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { + return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "") +} + +func (store *AbstractSqlStore) Shutdown() { + store.DB.Close() +} diff --git a/weed/filer/cassandra/README.txt b/weed/filer/cassandra/README.txt new file mode 100644 index 000000000..122c9c3f4 --- /dev/null +++ b/weed/filer/cassandra/README.txt @@ -0,0 +1,14 @@ +1. create a keyspace + +CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1}; + +2. create filemeta table + + USE seaweedfs; + + CREATE TABLE filemeta ( + directory varchar, + name varchar, + meta blob, + PRIMARY KEY (directory, name) + ) WITH CLUSTERING ORDER BY (name ASC); diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go new file mode 100644 index 000000000..fd161b1f1 --- /dev/null +++ b/weed/filer/cassandra/cassandra_store.go @@ -0,0 +1,163 @@ +package cassandra + +import ( + "context" + "fmt" + "github.com/gocql/gocql" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + filer.Stores = append(filer.Stores, &CassandraStore{}) +} + +type CassandraStore struct { + cluster *gocql.ClusterConfig + session *gocql.Session +} + +func (store *CassandraStore) GetName() string { + return "cassandra" +} + +func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"keyspace"), + configuration.GetStringSlice(prefix+"hosts"), + ) +} + +func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) { + store.cluster = gocql.NewCluster(hosts...) + store.cluster.Keyspace = keyspace + store.cluster.Consistency = gocql.LocalQuorum + store.session, err = store.cluster.CreateSession() + if err != nil { + glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) + } + return +} + +func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *CassandraStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *CassandraStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if err := store.session.Query( + "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ", + dir, name, meta, entry.TtlSec).Exec(); err != nil { + return fmt.Errorf("insert %s: %s", entry.FullPath, err) + } + + return nil +} + +func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + dir, name := fullpath.DirAndName() + var data []byte + if err := store.session.Query( + "SELECT meta FROM filemeta WHERE directory=? AND name=?", + dir, name).Consistency(gocql.One).Scan(&data); err != nil { + if err != gocql.ErrNotFound { + return nil, filer_pb.ErrNotFound + } + } + + if len(data) == 0 { + return nil, filer_pb.ErrNotFound + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(data) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { + + dir, name := fullpath.DirAndName() + + if err := store.session.Query( + "DELETE FROM filemeta WHERE directory=? AND name=?", + dir, name).Exec(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + + if err := store.session.Query( + "DELETE FROM filemeta WHERE directory=?", + fullpath).Exec(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer.Entry, err error) { + + cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" + if inclusive { + cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" + } + + var data []byte + var name string + iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter() + for iter.Scan(&name, &data) { + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(fullpath), name), + } + if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + if err := iter.Close(); err != nil { + glog.V(0).Infof("list iterator close: %v", err) + } + + return entries, err +} + +func (store *CassandraStore) Shutdown() { + store.session.Close() +} diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go new file mode 100644 index 000000000..3dce67d6d --- /dev/null +++ b/weed/filer/configuration.go @@ -0,0 +1,50 @@ +package filer + +import ( + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/spf13/viper" +) + +var ( + Stores []FilerStore +) + +func (f *Filer) LoadConfiguration(config *viper.Viper) { + + validateOneEnabledStore(config) + + for _, store := range Stores { + if config.GetBool(store.GetName() + ".enabled") { + if err := store.Initialize(config, store.GetName()+"."); err != nil { + glog.Fatalf("Failed to initialize store for %s: %+v", + store.GetName(), err) + } + f.SetStore(store) + glog.V(0).Infof("Configure filer for %s", store.GetName()) + return + } + } + + println() + println("Supported filer stores are:") + for _, store := range Stores { + println(" " + store.GetName()) + } + + os.Exit(-1) +} + +func validateOneEnabledStore(config *viper.Viper) { + enabledStore := "" + for _, store := range Stores { + if config.GetBool(store.GetName() + ".enabled") { + if enabledStore == "" { + enabledStore = store.GetName() + } else { + glog.Fatalf("Filer store is enabled for both %s and %s", enabledStore, store.GetName()) + } + } + } +} diff --git a/weed/filer/entry.go b/weed/filer/entry.go new file mode 100644 index 000000000..4a73de19a --- /dev/null +++ b/weed/filer/entry.go @@ -0,0 +1,91 @@ +package filer + +import ( + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type Attr struct { + Mtime time.Time // time of last modification + Crtime time.Time // time of creation (OS X only) + Mode os.FileMode // file mode + Uid uint32 // owner uid + Gid uint32 // group gid + Mime string // mime type + Replication string // replication + Collection string // collection name + TtlSec int32 // ttl in seconds + UserName string + GroupNames []string + SymlinkTarget string + Md5 []byte + FileSize uint64 +} + +func (attr Attr) IsDirectory() bool { + return attr.Mode&os.ModeDir > 0 +} + +type Entry struct { + util.FullPath + + Attr + Extended map[string][]byte + + // the following is for files + Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` +} + +func (entry *Entry) Size() uint64 { + return maxUint64(TotalSize(entry.Chunks), entry.FileSize) +} + +func (entry *Entry) Timestamp() time.Time { + if entry.IsDirectory() { + return entry.Crtime + } else { + return entry.Mtime + } +} + +func (entry *Entry) ToProtoEntry() *filer_pb.Entry { + if entry == nil { + return nil + } + return &filer_pb.Entry{ + Name: entry.FullPath.Name(), + IsDirectory: entry.IsDirectory(), + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + } +} + +func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { + if entry == nil { + return nil + } + dir, _ := entry.FullPath.DirAndName() + return &filer_pb.FullEntry{ + Dir: dir, + Entry: entry.ToProtoEntry(), + } +} + +func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { + return &Entry{ + FullPath: util.NewFullPath(dir, entry.Name), + Attr: PbToEntryAttribute(entry.Attributes), + Chunks: entry.Chunks, + } +} + +func maxUint64(x, y uint64) uint64 { + if x > y { + return x + } + return y +} diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go new file mode 100644 index 000000000..fb6448b30 --- /dev/null +++ b/weed/filer/entry_codec.go @@ -0,0 +1,124 @@ +package filer + +import ( + "bytes" + "fmt" + "os" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { + message := &filer_pb.Entry{ + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + } + return proto.Marshal(message) +} + +func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { + + message := &filer_pb.Entry{} + + if err := proto.UnmarshalMerge(blob, message); err != nil { + return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err) + } + + entry.Attr = PbToEntryAttribute(message.Attributes) + + entry.Extended = message.Extended + + entry.Chunks = message.Chunks + + return nil +} + +func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { + + return &filer_pb.FuseAttributes{ + Crtime: entry.Attr.Crtime.Unix(), + Mtime: entry.Attr.Mtime.Unix(), + FileMode: uint32(entry.Attr.Mode), + Uid: entry.Uid, + Gid: entry.Gid, + Mime: entry.Mime, + Collection: entry.Attr.Collection, + Replication: entry.Attr.Replication, + TtlSec: entry.Attr.TtlSec, + UserName: entry.Attr.UserName, + GroupName: entry.Attr.GroupNames, + SymlinkTarget: entry.Attr.SymlinkTarget, + Md5: entry.Attr.Md5, + FileSize: entry.Attr.FileSize, + } +} + +func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { + + t := Attr{} + + t.Crtime = time.Unix(attr.Crtime, 0) + t.Mtime = time.Unix(attr.Mtime, 0) + t.Mode = os.FileMode(attr.FileMode) + t.Uid = attr.Uid + t.Gid = attr.Gid + t.Mime = attr.Mime + t.Collection = attr.Collection + t.Replication = attr.Replication + t.TtlSec = attr.TtlSec + t.UserName = attr.UserName + t.GroupNames = attr.GroupName + t.SymlinkTarget = attr.SymlinkTarget + t.Md5 = attr.Md5 + t.FileSize = attr.FileSize + + return t +} + +func EqualEntry(a, b *Entry) bool { + if a == b { + return true + } + if a == nil && b != nil || a != nil && b == nil { + return false + } + if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) { + return false + } + if len(a.Chunks) != len(b.Chunks) { + return false + } + + if !eq(a.Extended, b.Extended) { + return false + } + + if !bytes.Equal(a.Md5, b.Md5) { + return false + } + + for i := 0; i < len(a.Chunks); i++ { + if !proto.Equal(a.Chunks[i], b.Chunks[i]) { + return false + } + } + return true +} + +func eq(a, b map[string][]byte) bool { + if len(a) != len(b) { + return false + } + + for k, v := range a { + if w, ok := b[k]; !ok || !bytes.Equal(v, w) { + return false + } + } + + return true +} diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go new file mode 100644 index 000000000..36db4ac01 --- /dev/null +++ b/weed/filer/etcd/etcd_store.go @@ -0,0 +1,204 @@ +package etcd + +import ( + "context" + "fmt" + "strings" + "time" + + "go.etcd.io/etcd/clientv3" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_FILE_SEPARATOR = byte(0x00) +) + +func init() { + filer.Stores = append(filer.Stores, &EtcdStore{}) +} + +type EtcdStore struct { + client *clientv3.Client +} + +func (store *EtcdStore) GetName() string { + return "etcd" +} + +func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + servers := configuration.GetString(prefix + "servers") + if servers == "" { + servers = "localhost:2379" + } + + timeout := configuration.GetString(prefix + "timeout") + if timeout == "" { + timeout = "3s" + } + + return store.initialize(servers, timeout) +} + +func (store *EtcdStore) initialize(servers string, timeout string) (err error) { + glog.Infof("filer store etcd: %s", servers) + + to, err := time.ParseDuration(timeout) + if err != nil { + return fmt.Errorf("parse timeout %s: %s", timeout, err) + } + + store.client, err = clientv3.New(clientv3.Config{ + Endpoints: strings.Split(servers, ","), + DialTimeout: to, + }) + if err != nil { + return fmt.Errorf("connect to etcd %s: %s", servers, err) + } + + return +} + +func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *EtcdStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *EtcdStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + key := genKey(entry.DirAndName()) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if _, err := store.client.Put(ctx, string(key), string(value)); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + return nil +} + +func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + key := genKey(fullpath.DirAndName()) + + resp, err := store.client.Get(ctx, string(key)) + if err != nil { + return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + } + + if len(resp.Kvs) == 0 { + return nil, filer_pb.ErrNotFound + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + key := genKey(fullpath.DirAndName()) + + if _, err := store.client.Delete(ctx, string(key)); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { + return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err) + } + + return nil +} + +func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + resp, err := store.client.Get(ctx, string(directoryPrefix), + clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + if err != nil { + return nil, fmt.Errorf("list %s : %v", fullpath, err) + } + + for _, kv := range resp.Kvs { + fileName := getNameFromKey(kv.Key) + if fileName == "" { + continue + } + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entry := &filer.Entry{ + FullPath: weed_util.NewFullPath(string(fullpath), fileName), + } + if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + + return entries, err +} + +func genKey(dirPath, fileName string) (key []byte) { + key = []byte(dirPath) + key = append(key, DIR_FILE_SEPARATOR) + key = append(key, []byte(fileName)...) + return key +} + +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = []byte(string(fullpath)) + keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func getNameFromKey(key []byte) string { + sepIndex := len(key) - 1 + for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR { + sepIndex-- + } + + return string(key[sepIndex+1:]) +} + +func (store *EtcdStore) Shutdown() { + store.client.Close() +} diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go new file mode 100644 index 000000000..e84cf21e5 --- /dev/null +++ b/weed/filer/filechunk_manifest.go @@ -0,0 +1,168 @@ +package filer + +import ( + "bytes" + "fmt" + "io" + "math" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + ManifestBatch = 1000 +) + +func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { + for _, chunk := range chunks { + if chunk.IsChunkManifest { + return true + } + } + return false +} + +func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) { + for _, c := range chunks { + if c.IsChunkManifest { + manifestChunks = append(manifestChunks, c) + } else { + nonManifestChunks = append(nonManifestChunks, c) + } + } + return +} + +func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { + // TODO maybe parallel this + for _, chunk := range chunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + continue + } + + resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) + if err != nil { + return chunks, nil, err + } + + manifestChunks = append(manifestChunks, chunk) + // recursive + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks) + if subErr != nil { + return chunks, nil, subErr + } + dataChunks = append(dataChunks, dchunks...) + manifestChunks = append(manifestChunks, mchunks...) + } + return +} + +func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { + if !chunk.IsChunkManifest { + return + } + + // IsChunkManifest + data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + if err != nil { + return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) + } + m := &filer_pb.FileChunkManifest{} + if err := proto.Unmarshal(data, m); err != nil { + return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) + } + + // recursive + filer_pb.AfterEntryDeserialization(m.Chunks) + return m.Chunks, nil +} + +// TODO fetch from cache for weed mount? +func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { + urlString, err := lookupFileIdFn(fileId) + if err != nil { + glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + var buffer bytes.Buffer + err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", fileId, err) + return nil, err + } + + return buffer.Bytes(), nil +} + +func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { + return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest) +} + +func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { + + var dataChunks []*filer_pb.FileChunk + for _, chunk := range inputChunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + } else { + chunks = append(chunks, chunk) + } + } + + remaining := len(dataChunks) + for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor { + chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor]) + if err != nil { + return dataChunks, err + } + chunks = append(chunks, chunk) + remaining -= mergeFactor + } + // remaining + for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { + chunks = append(chunks, dataChunks[i]) + } + return +} + +func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + filer_pb.BeforeEntrySerialization(dataChunks) + + // create and serialize the manifest + data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{ + Chunks: dataChunks, + }) + if serErr != nil { + return nil, fmt.Errorf("serializing manifest: %v", serErr) + } + + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for _, chunk := range dataChunks { + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0) + if err != nil { + return nil, err + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} + +type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) diff --git a/weed/filer/filechunk_manifest_test.go b/weed/filer/filechunk_manifest_test.go new file mode 100644 index 000000000..ce12c5da6 --- /dev/null +++ b/weed/filer/filechunk_manifest_test.go @@ -0,0 +1,113 @@ +package filer + +import ( + "bytes" + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestDoMaybeManifestize(t *testing.T) { + var manifestTests = []struct { + inputs []*filer_pb.FileChunk + expected []*filer_pb.FileChunk + }{ + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "12", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "23", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "2", IsChunkManifest: true}, + {FileId: "13", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + } + + for i, mtest := range manifestTests { + println("test", i) + actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge) + assertEqualChunks(t, mtest.expected, actual) + } + +} + +func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) { + assert.Equal(t, len(expected), len(actual)) + for i := 0; i < len(actual); i++ { + assertEqualChunk(t, actual[i], expected[i]) + } +} +func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) { + assert.Equal(t, expected.FileId, actual.FileId) + assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest) +} + +func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + var buf bytes.Buffer + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for k := 0; k < len(dataChunks); k++ { + chunk := dataChunks[k] + buf.WriteString(chunk.FileId) + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk = &filer_pb.FileChunk{ + FileId: buf.String(), + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go new file mode 100644 index 000000000..c45963193 --- /dev/null +++ b/weed/filer/filechunks.go @@ -0,0 +1,284 @@ +package filer + +import ( + "fmt" + "hash/fnv" + "math" + "sort" + "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { + for _, c := range chunks { + t := uint64(c.Offset + int64(c.Size)) + if size < t { + size = t + } + } + return +} + +func FileSize(entry *filer_pb.Entry) (size uint64) { + return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) +} + +func ETag(entry *filer_pb.Entry) (etag string) { + if entry.Attributes == nil || entry.Attributes.Md5 == nil { + return ETagChunks(entry.Chunks) + } + return fmt.Sprintf("%x", entry.Attributes.Md5) +} + +func ETagEntry(entry *Entry) (etag string) { + if entry.Attr.Md5 == nil { + return ETagChunks(entry.Chunks) + } + return fmt.Sprintf("%x", entry.Attr.Md5) +} + +func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { + if len(chunks) == 1 { + return chunks[0].ETag + } + + h := fnv.New32a() + for _, c := range chunks { + h.Write([]byte(c.ETag)) + } + return fmt.Sprintf("%x", h.Sum32()) +} + +func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { + + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) + + fileIds := make(map[string]bool) + for _, interval := range visibles { + fileIds[interval.fileId] = true + } + for _, chunk := range chunks { + if _, found := fileIds[chunk.GetFileIdString()]; found { + compacted = append(compacted, chunk) + } else { + garbage = append(garbage, chunk) + } + } + + return +} + +func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { + + aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as) + if aErr != nil { + return nil, aErr + } + bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs) + if bErr != nil { + return nil, bErr + } + + delta = append(delta, DoMinusChunks(aData, bData)...) + delta = append(delta, DoMinusChunks(aMeta, bMeta)...) + return +} + +func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { + + fileIds := make(map[string]bool) + for _, interval := range bs { + fileIds[interval.GetFileIdString()] = true + } + for _, chunk := range as { + if _, found := fileIds[chunk.GetFileIdString()]; !found { + delta = append(delta, chunk) + } + } + + return +} + +type ChunkView struct { + FileId string + Offset int64 + Size uint64 + LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk + ChunkSize uint64 + CipherKey []byte + IsGzipped bool +} + +func (cv *ChunkView) IsFullChunk() bool { + return cv.Size == cv.ChunkSize +} + +func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { + + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) + + return ViewFromVisibleIntervals(visibles, offset, size) + +} + +func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) { + + stop := offset + size + if size == math.MaxInt64 { + stop = math.MaxInt64 + } + if stop < offset { + stop = math.MaxInt64 + } + + for _, chunk := range visibles { + + chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop) + + if chunkStart < chunkStop { + views = append(views, &ChunkView{ + FileId: chunk.fileId, + Offset: chunkStart - chunk.start + chunk.chunkOffset, + Size: uint64(chunkStop - chunkStart), + LogicOffset: chunkStart, + ChunkSize: chunk.chunkSize, + CipherKey: chunk.cipherKey, + IsGzipped: chunk.isGzipped, + }) + } + } + + return views + +} + +func logPrintf(name string, visibles []VisibleInterval) { + + /* + glog.V(0).Infof("%s len %d", name, len(visibles)) + for _, v := range visibles { + glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset) + } + */ +} + +var bufPool = sync.Pool{ + New: func() interface{} { + return new(VisibleInterval) + }, +} + +func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) { + + newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed) + + length := len(visibles) + if length == 0 { + return append(visibles, newV) + } + last := visibles[length-1] + if last.stop <= chunk.Offset { + return append(visibles, newV) + } + + logPrintf(" before", visibles) + // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size) + chunkStop := chunk.Offset + int64(chunk.Size) + for _, v := range visibles { + if v.start < chunk.Offset && chunk.Offset < v.stop { + t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped) + newVisibles = append(newVisibles, t) + // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop) + } + if v.start < chunkStop && chunkStop < v.stop { + t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped) + newVisibles = append(newVisibles, t) + // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop) + } + if chunkStop <= v.start || v.stop <= chunk.Offset { + newVisibles = append(newVisibles, v) + // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop) + } + } + newVisibles = append(newVisibles, newV) + + logPrintf(" append", newVisibles) + + for i := len(newVisibles) - 1; i >= 0; i-- { + if i > 0 && newV.start < newVisibles[i-1].start { + newVisibles[i] = newVisibles[i-1] + } else { + newVisibles[i] = newV + break + } + } + logPrintf(" sorted", newVisibles) + + return newVisibles +} + +// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory +// If the file chunk content is a chunk manifest +func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) { + + chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) + + sort.Slice(chunks, func(i, j int) bool { + if chunks[i].Mtime == chunks[j].Mtime { + return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + } + return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run + }) + + for _, chunk := range chunks { + + // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) + visibles = MergeIntoVisibles(visibles, chunk) + + logPrintf("add", visibles) + + } + + return +} + +// find non-overlapping visible intervals +// visible interval map to one file chunk + +type VisibleInterval struct { + start int64 + stop int64 + modifiedTime int64 + fileId string + chunkOffset int64 + chunkSize uint64 + cipherKey []byte + isGzipped bool +} + +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { + return VisibleInterval{ + start: start, + stop: stop, + fileId: fileId, + modifiedTime: modifiedTime, + chunkOffset: chunkOffset, // the starting position in the chunk + chunkSize: chunkSize, + cipherKey: cipherKey, + isGzipped: isGzipped, + } +} + +func min(x, y int64) int64 { + if x <= y { + return x + } + return y +} +func max(x, y int64) int64 { + if x <= y { + return y + } + return x +} diff --git a/weed/filer/filechunks2_test.go b/weed/filer/filechunks2_test.go new file mode 100644 index 000000000..9f9566d9b --- /dev/null +++ b/weed/filer/filechunks2_test.go @@ -0,0 +1,46 @@ +package filer + +import ( + "sort" + "testing" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestCompactFileChunksRealCase(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + {FileId: "2,512f31f2c0700a", Offset: 0, Size: 25 - 0, Mtime: 5320497}, + {FileId: "6,512f2c2e24e9e8", Offset: 868352, Size: 917585 - 868352, Mtime: 5320492}, + {FileId: "7,514468dd5954ca", Offset: 884736, Size: 901120 - 884736, Mtime: 5325928}, + {FileId: "5,5144463173fe77", Offset: 917504, Size: 2297856 - 917504, Mtime: 5325894}, + {FileId: "4,51444c7ab54e2d", Offset: 2301952, Size: 2367488 - 2301952, Mtime: 5325900}, + {FileId: "4,514450e643ad22", Offset: 2371584, Size: 2420736 - 2371584, Mtime: 5325904}, + {FileId: "6,514456a5e9e4d7", Offset: 2449408, Size: 2490368 - 2449408, Mtime: 5325910}, + {FileId: "3,51444f8d53eebe", Offset: 2494464, Size: 2555904 - 2494464, Mtime: 5325903}, + {FileId: "4,5144578b097c7e", Offset: 2560000, Size: 2596864 - 2560000, Mtime: 5325911}, + {FileId: "3,51445500b6b4ac", Offset: 2637824, Size: 2678784 - 2637824, Mtime: 5325909}, + {FileId: "1,51446285e52a61", Offset: 2695168, Size: 2715648 - 2695168, Mtime: 5325922}, + } + + printChunks("before", chunks) + + compacted, garbage := CompactFileChunks(nil, chunks) + + printChunks("compacted", compacted) + printChunks("garbage", garbage) + +} + +func printChunks(name string, chunks []*filer_pb.FileChunk) { + sort.Slice(chunks, func(i, j int) bool { + if chunks[i].Offset == chunks[j].Offset { + return chunks[i].Mtime < chunks[j].Mtime + } + return chunks[i].Offset < chunks[j].Offset + }) + for _, chunk := range chunks { + glog.V(0).Infof("%s chunk %s [%10d,%10d)", name, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) + } +} diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go new file mode 100644 index 000000000..699e7e298 --- /dev/null +++ b/weed/filer/filechunks_test.go @@ -0,0 +1,539 @@ +package filer + +import ( + "fmt" + "log" + "math" + "math/rand" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestCompactFileChunks(t *testing.T) { + chunks := []*filer_pb.FileChunk{ + {Offset: 10, Size: 100, FileId: "abc", Mtime: 50}, + {Offset: 100, Size: 100, FileId: "def", Mtime: 100}, + {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200}, + {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300}, + } + + compacted, garbage := CompactFileChunks(nil, chunks) + + if len(compacted) != 3 { + t.Fatalf("unexpected compacted: %d", len(compacted)) + } + if len(garbage) != 1 { + t.Fatalf("unexpected garbage: %d", len(garbage)) + } + +} + +func TestCompactFileChunks2(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 50}, + {Offset: 100, Size: 100, FileId: "def", Mtime: 100}, + {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200}, + {Offset: 0, Size: 100, FileId: "abcf", Mtime: 300}, + {Offset: 50, Size: 100, FileId: "fhfh", Mtime: 400}, + {Offset: 100, Size: 100, FileId: "yuyu", Mtime: 500}, + } + + k := 3 + + for n := 0; n < k; n++ { + chunks = append(chunks, &filer_pb.FileChunk{ + Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n), + }) + chunks = append(chunks, &filer_pb.FileChunk{ + Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k), + }) + } + + compacted, garbage := CompactFileChunks(nil, chunks) + + if len(compacted) != 4 { + t.Fatalf("unexpected compacted: %d", len(compacted)) + } + if len(garbage) != 8 { + t.Fatalf("unexpected garbage: %d", len(garbage)) + } +} + +func TestRandomFileChunksCompact(t *testing.T) { + + data := make([]byte, 1024) + + var chunks []*filer_pb.FileChunk + for i := 0; i < 15; i++ { + start, stop := rand.Intn(len(data)), rand.Intn(len(data)) + if start > stop { + start, stop = stop, start + } + if start+16 < stop { + stop = start + 16 + } + chunk := &filer_pb.FileChunk{ + FileId: strconv.Itoa(i), + Offset: int64(start), + Size: uint64(stop - start), + Mtime: int64(i), + Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + chunks = append(chunks, chunk) + for x := start; x < stop; x++ { + data[x] = byte(i) + } + } + + visibles, _ := NonOverlappingVisibleIntervals(nil, chunks) + + for _, v := range visibles { + for x := v.start; x < v.stop; x++ { + assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId) + } + } + +} + +func TestIntervalMerging(t *testing.T) { + + testcases := []struct { + Chunks []*filer_pb.FileChunk + Expected []*VisibleInterval + }{ + // case 0: normal + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, + {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 100, fileId: "abc"}, + {start: 100, stop: 200, fileId: "asdf"}, + {start: 200, stop: 300, fileId: "fsad"}, + }, + }, + // case 1: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 200, fileId: "asdf"}, + }, + }, + // case 2: updates overwrite part of previous chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 70, FileId: "b", Mtime: 134}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 70, fileId: "b"}, + {start: 70, stop: 100, fileId: "a", chunkOffset: 70}, + }, + }, + // case 3: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 50, fileId: "asdf"}, + {start: 50, stop: 300, fileId: "xxxx"}, + }, + }, + // case 4: updates far away from prev chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 200, fileId: "asdf"}, + {start: 250, stop: 500, fileId: "xxxx"}, + }, + }, + // case 5: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "d", Mtime: 184}, + {Offset: 70, Size: 150, FileId: "c", Mtime: 143}, + {Offset: 80, Size: 100, FileId: "b", Mtime: 134}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 200, fileId: "d"}, + {start: 200, stop: 220, fileId: "c", chunkOffset: 130}, + }, + }, + // case 6: same updates + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 100, fileId: "xyz"}, + }, + }, + // case 7: real updates + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 2097152, FileId: "7,0294cbb9892b", Mtime: 123}, + {Offset: 0, Size: 3145728, FileId: "3,029565bf3092", Mtime: 130}, + {Offset: 2097152, Size: 3145728, FileId: "6,029632f47ae2", Mtime: 140}, + {Offset: 5242880, Size: 3145728, FileId: "2,029734c5aa10", Mtime: 150}, + {Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160}, + {Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 2097152, fileId: "3,029565bf3092"}, + {start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"}, + {start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"}, + {start: 8388608, stop: 11534336, fileId: "5,02982f80de50"}, + {start: 11534336, stop: 14376529, fileId: "7,0299ad723803"}, + }, + }, + // case 8: real bug + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 77824, FileId: "4,0b3df938e301", Mtime: 123}, + {Offset: 471040, Size: 472225 - 471040, FileId: "6,0b3e0650019c", Mtime: 130}, + {Offset: 77824, Size: 208896 - 77824, FileId: "4,0b3f0c7202f0", Mtime: 140}, + {Offset: 208896, Size: 339968 - 208896, FileId: "2,0b4031a72689", Mtime: 150}, + {Offset: 339968, Size: 471040 - 339968, FileId: "3,0b416a557362", Mtime: 160}, + }, + Expected: []*VisibleInterval{ + {start: 0, stop: 77824, fileId: "4,0b3df938e301"}, + {start: 77824, stop: 208896, fileId: "4,0b3f0c7202f0"}, + {start: 208896, stop: 339968, fileId: "2,0b4031a72689"}, + {start: 339968, stop: 471040, fileId: "3,0b416a557362"}, + {start: 471040, stop: 472225, fileId: "6,0b3e0650019c"}, + }, + }, + } + + for i, testcase := range testcases { + log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) + intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks) + for x, interval := range intervals { + log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", + i, x, interval.start, interval.stop, interval.fileId) + } + for x, interval := range intervals { + if interval.start != testcase.Expected[x].start { + t.Fatalf("failed on test case %d, interval %d, start %d, expect %d", + i, x, interval.start, testcase.Expected[x].start) + } + if interval.stop != testcase.Expected[x].stop { + t.Fatalf("failed on test case %d, interval %d, stop %d, expect %d", + i, x, interval.stop, testcase.Expected[x].stop) + } + if interval.fileId != testcase.Expected[x].fileId { + t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s", + i, x, interval.fileId, testcase.Expected[x].fileId) + } + if interval.chunkOffset != testcase.Expected[x].chunkOffset { + t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d", + i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset) + } + } + if len(intervals) != len(testcase.Expected) { + t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected)) + } + + } + +} + +func TestChunksReading(t *testing.T) { + + testcases := []struct { + Chunks []*filer_pb.FileChunk + Offset int64 + Size int64 + Expected []*ChunkView + }{ + // case 0: normal + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, + {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, + }, + Offset: 0, + Size: 250, + Expected: []*ChunkView{ + {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, + {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, + {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200}, + }, + }, + // case 1: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + }, + Offset: 50, + Size: 100, + Expected: []*ChunkView{ + {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50}, + }, + }, + // case 2: updates overwrite part of previous chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 3, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 10, Size: 50, FileId: "b", Mtime: 134}, + }, + Offset: 30, + Size: 40, + Expected: []*ChunkView{ + {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30}, + {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60}, + }, + }, + // case 3: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, + }, + Offset: 0, + Size: 200, + Expected: []*ChunkView{ + {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0}, + {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50}, + }, + }, + // case 4: updates far away from prev chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, + }, + Offset: 0, + Size: 400, + Expected: []*ChunkView{ + {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, + {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250}, + }, + }, + // case 5: updates overwrite full chunks + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "c", Mtime: 184}, + {Offset: 70, Size: 150, FileId: "b", Mtime: 143}, + {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, + }, + Offset: 0, + Size: 220, + Expected: []*ChunkView{ + {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0}, + {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200}, + }, + }, + // case 6: same updates + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123}, + }, + Offset: 0, + Size: 100, + Expected: []*ChunkView{ + {Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0}, + }, + }, + // case 7: edge cases + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, + {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, + }, + Offset: 0, + Size: 200, + Expected: []*ChunkView{ + {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, + {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, + }, + }, + // case 8: edge cases + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 90, Size: 200, FileId: "asdf", Mtime: 134}, + {Offset: 190, Size: 300, FileId: "fsad", Mtime: 353}, + }, + Offset: 0, + Size: 300, + Expected: []*ChunkView{ + {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0}, + {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90}, + {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190}, + }, + }, + // case 9: edge cases + { + Chunks: []*filer_pb.FileChunk{ + {Offset: 0, Size: 43175947, FileId: "2,111fc2cbfac1", Mtime: 1}, + {Offset: 43175936, Size: 52981771 - 43175936, FileId: "2,112a36ea7f85", Mtime: 2}, + {Offset: 52981760, Size: 72564747 - 52981760, FileId: "4,112d5f31c5e7", Mtime: 3}, + {Offset: 72564736, Size: 133255179 - 72564736, FileId: "1,113245f0cdb6", Mtime: 4}, + {Offset: 133255168, Size: 137269259 - 133255168, FileId: "3,1141a70733b5", Mtime: 5}, + {Offset: 137269248, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", Mtime: 6}, + }, + Offset: 0, + Size: 153578836, + Expected: []*ChunkView{ + {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0}, + {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936}, + {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760}, + {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736}, + {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168}, + {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248}, + }, + }, + } + + for i, testcase := range testcases { + if i != 2 { + // continue + } + log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) + chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) + for x, chunk := range chunks { + log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", + i, x, chunk.Offset, chunk.Size, chunk.FileId) + if chunk.Offset != testcase.Expected[x].Offset { + t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d", + i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset) + } + if chunk.Size != testcase.Expected[x].Size { + t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d", + i, chunk.FileId, chunk.Size, testcase.Expected[x].Size) + } + if chunk.FileId != testcase.Expected[x].FileId { + t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s", + i, x, chunk.FileId, testcase.Expected[x].FileId) + } + if chunk.LogicOffset != testcase.Expected[x].LogicOffset { + t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d", + i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset) + } + } + if len(chunks) != len(testcase.Expected) { + t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected)) + } + } + +} + +func BenchmarkCompactFileChunks(b *testing.B) { + + var chunks []*filer_pb.FileChunk + + k := 1024 + + for n := 0; n < k; n++ { + chunks = append(chunks, &filer_pb.FileChunk{ + Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n), + }) + chunks = append(chunks, &filer_pb.FileChunk{ + Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k), + }) + } + + for n := 0; n < b.N; n++ { + CompactFileChunks(nil, chunks) + } +} + +func TestViewFromVisibleIntervals(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 0, + stop: 25, + fileId: "fid1", + }, + { + start: 4096, + stop: 8192, + fileId: "fid2", + }, + { + start: 16384, + stop: 18551, + fileId: "fid3", + }, + } + + views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} + +func TestViewFromVisibleIntervals2(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 344064, + stop: 348160, + fileId: "fid1", + }, + { + start: 348160, + stop: 356352, + fileId: "fid2", + }, + } + + views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} + +func TestViewFromVisibleIntervals3(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 1000, + stop: 2000, + fileId: "fid1", + }, + { + start: 3000, + stop: 4000, + fileId: "fid2", + }, + } + + views := ViewFromVisibleIntervals(visibles, 1700, 1500) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go new file mode 100644 index 000000000..71da1a080 --- /dev/null +++ b/weed/filer/filer.go @@ -0,0 +1,290 @@ +package filer + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +const ( + LogFlushInterval = time.Minute + PaginationSize = 1024 * 256 +) + +var ( + OS_UID = uint32(os.Getuid()) + OS_GID = uint32(os.Getgid()) + ErrUnsupportedListDirectoryPrefixed = errors.New("UNSUPPORTED") +) + +type Filer struct { + Store *FilerStoreWrapper + MasterClient *wdclient.MasterClient + fileIdDeletionQueue *util.UnboundedQueue + GrpcDialOption grpc.DialOption + DirBucketsPath string + FsyncBuckets []string + buckets *FilerBuckets + Cipher bool + LocalMetaLogBuffer *log_buffer.LogBuffer + metaLogCollection string + metaLogReplication string + MetaAggregator *MetaAggregator + Signature int32 +} + +func NewFiler(masters []string, grpcDialOption grpc.DialOption, + filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { + f := &Filer{ + MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), + fileIdDeletionQueue: util.NewUnboundedQueue(), + GrpcDialOption: grpcDialOption, + Signature: util.RandomInt32(), + } + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) + f.metaLogCollection = collection + f.metaLogReplication = replication + + go f.loopProcessingDeletion() + + return f +} + +func (f *Filer) AggregateFromPeers(self string, filers []string) { + + // set peers + if len(filers) == 0 { + filers = append(filers, self) + } + f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) + f.MetaAggregator.StartLoopSubscribe(f, self) + +} + +func (f *Filer) SetStore(store FilerStore) { + f.Store = NewFilerStoreWrapper(store) +} + +func (f *Filer) GetStore() (store FilerStore) { + return f.Store +} + +func (fs *Filer) GetMaster() string { + return fs.MasterClient.GetMaster() +} + +func (fs *Filer) KeepConnectedToMaster() { + fs.MasterClient.KeepConnectedToMaster() +} + +func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) { + return f.Store.BeginTransaction(ctx) +} + +func (f *Filer) CommitTransaction(ctx context.Context) error { + return f.Store.CommitTransaction(ctx) +} + +func (f *Filer) RollbackTransaction(ctx context.Context) error { + return f.Store.RollbackTransaction(ctx) +} + +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error { + + if string(entry.FullPath) == "/" { + return nil + } + + dirParts := strings.Split(string(entry.FullPath), "/") + + // fmt.Printf("directory parts: %+v\n", dirParts) + + var lastDirectoryEntry *Entry + + for i := 1; i < len(dirParts); i++ { + dirPath := "/" + util.Join(dirParts[:i]...) + // fmt.Printf("%d directory: %+v\n", i, dirPath) + + // check the store directly + glog.V(4).Infof("find uncached directory: %s", dirPath) + dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) + + // no such existing directory + if dirEntry == nil { + + // create the directory + now := time.Now() + + dirEntry = &Entry{ + FullPath: util.FullPath(dirPath), + Attr: Attr{ + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0110, + Uid: entry.Uid, + Gid: entry.Gid, + Collection: entry.Collection, + Replication: entry.Replication, + UserName: entry.UserName, + GroupNames: entry.GroupNames, + }, + } + + glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) + mkdirErr := f.Store.InsertEntry(ctx, dirEntry) + if mkdirErr != nil { + if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { + glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) + return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) + } + } else { + f.maybeAddBucket(dirEntry) + f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) + } + + } else if !dirEntry.IsDirectory() { + glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) + return fmt.Errorf("%s is a file", dirPath) + } + + // remember the direct parent directory entry + if i == len(dirParts)-1 { + lastDirectoryEntry = dirEntry + } + + } + + if lastDirectoryEntry == nil { + glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath) + return fmt.Errorf("parent folder not found: %v", entry.FullPath) + } + + /* + if !hasWritePermission(lastDirectoryEntry, entry) { + glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d", + lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid) + return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath) + } + */ + + oldEntry, _ := f.FindEntry(ctx, entry.FullPath) + + glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl) + if oldEntry == nil { + if err := f.Store.InsertEntry(ctx, entry); err != nil { + glog.Errorf("insert entry %s: %v", entry.FullPath, err) + return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) + } + } else { + if o_excl { + glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath) + return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath) + } + if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil { + glog.Errorf("update entry %s: %v", entry.FullPath, err) + return fmt.Errorf("update entry %s: %v", entry.FullPath, err) + } + } + + f.maybeAddBucket(entry) + f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) + + f.deleteChunksIfNotNew(oldEntry, entry) + + glog.V(4).Infof("CreateEntry %s: created", entry.FullPath) + + return nil +} + +func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { + if oldEntry != nil { + if oldEntry.IsDirectory() && !entry.IsDirectory() { + glog.Errorf("existing %s is a directory", entry.FullPath) + return fmt.Errorf("existing %s is a directory", entry.FullPath) + } + if !oldEntry.IsDirectory() && entry.IsDirectory() { + glog.Errorf("existing %s is a file", entry.FullPath) + return fmt.Errorf("existing %s is a file", entry.FullPath) + } + } + return f.Store.UpdateEntry(ctx, entry) +} + +func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) { + + now := time.Now() + + if string(p) == "/" { + return &Entry{ + FullPath: p, + Attr: Attr{ + Mtime: now, + Crtime: now, + Mode: os.ModeDir | 0755, + Uid: OS_UID, + Gid: OS_GID, + }, + }, nil + } + entry, err = f.Store.FindEntry(ctx, p) + if entry != nil && entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.Store.DeleteEntry(ctx, p.Child(entry.Name())) + return nil, filer_pb.ErrNotFound + } + } + return + +} + +func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) { + if strings.HasSuffix(string(p), "/") && len(p) > 1 { + p = p[0 : len(p)-1] + } + + var makeupEntries []*Entry + entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix) + for expiredCount > 0 && err == nil { + makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix) + if err == nil { + entries = append(entries, makeupEntries...) + } + } + + return entries, err +} + +func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) { + listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix) + if listErr != nil { + return listedEntries, expiredCount, "", listErr + } + for _, entry := range listedEntries { + lastFileName = entry.Name() + if entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.Store.DeleteEntry(ctx, p.Child(entry.Name())) + expiredCount++ + continue + } + } + entries = append(entries, entry) + } + return +} + +func (f *Filer) Shutdown() { + f.LocalMetaLogBuffer.Shutdown() + f.Store.Shutdown() +} diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go new file mode 100644 index 000000000..4d4f4abc3 --- /dev/null +++ b/weed/filer/filer_buckets.go @@ -0,0 +1,121 @@ +package filer + +import ( + "context" + "math" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type BucketName string +type BucketOption struct { + Name BucketName + Replication string + fsync bool +} +type FilerBuckets struct { + dirBucketsPath string + buckets map[BucketName]*BucketOption + sync.RWMutex +} + +func (f *Filer) LoadBuckets() { + + f.buckets = &FilerBuckets{ + buckets: make(map[BucketName]*BucketOption), + } + + limit := math.MaxInt32 + + entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "") + + if err != nil { + glog.V(1).Infof("no buckets found: %v", err) + return + } + + shouldFsyncMap := make(map[string]bool) + for _, bucket := range f.FsyncBuckets { + shouldFsyncMap[bucket] = true + } + + glog.V(1).Infof("buckets found: %d", len(entries)) + + f.buckets.Lock() + for _, entry := range entries { + _, shouldFsnyc := shouldFsyncMap[entry.Name()] + f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ + Name: BucketName(entry.Name()), + Replication: entry.Replication, + fsync: shouldFsnyc, + } + } + f.buckets.Unlock() + +} + +func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) { + + f.buckets.RLock() + defer f.buckets.RUnlock() + + option, found := f.buckets.buckets[BucketName(buketName)] + + if !found { + return "", false + } + return option.Replication, option.fsync + +} + +func (f *Filer) isBucket(entry *Entry) bool { + if !entry.IsDirectory() { + return false + } + parent, dirName := entry.FullPath.DirAndName() + if parent != f.DirBucketsPath { + return false + } + + f.buckets.RLock() + defer f.buckets.RUnlock() + + _, found := f.buckets.buckets[BucketName(dirName)] + + return found + +} + +func (f *Filer) maybeAddBucket(entry *Entry) { + if !entry.IsDirectory() { + return + } + parent, dirName := entry.FullPath.DirAndName() + if parent != f.DirBucketsPath { + return + } + f.addBucket(dirName, &BucketOption{ + Name: BucketName(dirName), + Replication: entry.Replication, + }) +} + +func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) { + + f.buckets.Lock() + defer f.buckets.Unlock() + + f.buckets.buckets[BucketName(buketName)] = bucketOption + +} + +func (f *Filer) deleteBucket(buketName string) { + + f.buckets.Lock() + defer f.buckets.Unlock() + + delete(f.buckets.buckets, BucketName(buketName)) + +} diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go new file mode 100644 index 000000000..e2198bd21 --- /dev/null +++ b/weed/filer/filer_delete_entry.go @@ -0,0 +1,129 @@ +package filer + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { + if p == "/" { + return nil + } + + entry, findErr := f.FindEntry(ctx, p) + if findErr != nil { + return findErr + } + + isCollection := f.isBucket(entry) + + var chunks []*filer_pb.FileChunk + chunks = append(chunks, entry.Chunks...) + if entry.IsDirectory() { + // delete the folder children, not including the folder itself + var dirChunks []*filer_pb.FileChunk + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster) + if err != nil { + glog.V(0).Infof("delete directory %s: %v", p, err) + return fmt.Errorf("delete directory %s: %v", p, err) + } + chunks = append(chunks, dirChunks...) + } + + // delete the file or folder + err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures) + if err != nil { + return fmt.Errorf("delete file %s: %v", p, err) + } + + if shouldDeleteChunks && !isCollection { + go f.DeleteChunks(chunks) + } + if isCollection { + collectionName := entry.Name() + f.doDeleteCollection(collectionName) + f.deleteBucket(collectionName) + } + + return nil +} + +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) { + + lastFileName := "" + includeLastFile := false + for { + entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "") + if err != nil { + glog.Errorf("list folder %s: %v", entry.FullPath, err) + return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) + } + if lastFileName == "" && !isRecursive && len(entries) > 0 { + // only for first iteration in the loop + glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) + return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + } + + for _, sub := range entries { + lastFileName = sub.Name() + var dirChunks []*filer_pb.FileChunk + if sub.IsDirectory() { + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false) + chunks = append(chunks, dirChunks...) + } else { + f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) + chunks = append(chunks, sub.Chunks...) + } + if err != nil && !ignoreRecursiveError { + return nil, err + } + } + + if len(entries) < PaginationSize { + break + } + } + + glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) + + if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { + return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) + } + + f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil) + + return chunks, nil +} + +func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) { + + glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) + + if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { + return fmt.Errorf("filer store delete: %v", storeDeletionErr) + } + if !entry.IsDirectory() { + f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) + } + + return nil +} + +func (f *Filer) doDeleteCollection(collectionName string) (err error) { + + return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ + Name: collectionName, + }) + if err != nil { + glog.Infof("delete collection %s: %v", collectionName, err) + } + return err + }) + +} diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go new file mode 100644 index 000000000..126d162ec --- /dev/null +++ b/weed/filer/filer_deletion.go @@ -0,0 +1,109 @@ +package filer + +import ( + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error) { + return func(vids []string) (map[string]operation.LookupResult, error) { + m := make(map[string]operation.LookupResult) + for _, vid := range vids { + locs, _ := masterClient.GetVidLocations(vid) + var locations []operation.Location + for _, loc := range locs { + locations = append(locations, operation.Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + }) + } + m[vid] = operation.LookupResult{ + VolumeId: vid, + Locations: locations, + } + } + return m, nil + } +} + +func (f *Filer) loopProcessingDeletion() { + + lookupFunc := LookupByMasterClientFn(f.MasterClient) + + DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. + + var deletionCount int + for { + deletionCount = 0 + f.fileIdDeletionQueue.Consume(func(fileIds []string) { + for len(fileIds) > 0 { + var toDeleteFileIds []string + if len(fileIds) > DeletionBatchSize { + toDeleteFileIds = fileIds[:DeletionBatchSize] + fileIds = fileIds[DeletionBatchSize:] + } else { + toDeleteFileIds = fileIds + fileIds = fileIds[:0] + } + deletionCount = len(toDeleteFileIds) + _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) + if err != nil { + if !strings.Contains(err.Error(), "already deleted") { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } + } else { + glog.V(1).Infof("deleting fileIds len=%d", deletionCount) + } + } + }) + + if deletionCount == 0 { + time.Sleep(1123 * time.Millisecond) + } + } +} + +func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { + for _, chunk := range chunks { + if !chunk.IsChunkManifest { + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString()) + } + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + } +} + +func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { + + if oldEntry == nil { + return + } + if newEntry == nil { + f.DeleteChunks(oldEntry.Chunks) + } + + var toDelete []*filer_pb.FileChunk + newChunkIds := make(map[string]bool) + for _, newChunk := range newEntry.Chunks { + newChunkIds[newChunk.GetFileIdString()] = true + } + + for _, oldChunk := range oldEntry.Chunks { + if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { + toDelete = append(toDelete, oldChunk) + } + } + f.DeleteChunks(toDelete) +} diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go new file mode 100644 index 000000000..e00117382 --- /dev/null +++ b/weed/filer/filer_notify.go @@ -0,0 +1,169 @@ +package filer + +import ( + "context" + "fmt" + "io" + "strings" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) { + var fullpath string + if oldEntry != nil { + fullpath = string(oldEntry.FullPath) + } else if newEntry != nil { + fullpath = string(newEntry.FullPath) + } else { + return + } + + // println("fullpath:", fullpath) + + if strings.HasPrefix(fullpath, SystemLogDir) { + return + } + + newParentPath := "" + if newEntry != nil { + newParentPath, _ = newEntry.FullPath.DirAndName() + } + eventNotification := &filer_pb.EventNotification{ + OldEntry: oldEntry.ToProtoEntry(), + NewEntry: newEntry.ToProtoEntry(), + DeleteChunks: deleteChunks, + NewParentPath: newParentPath, + IsFromOtherCluster: isFromOtherCluster, + Signatures: append(signatures, f.Signature), + } + + if notification.Queue != nil { + glog.V(3).Infof("notifying entry update %v", fullpath) + notification.Queue.SendMessage(fullpath, eventNotification) + } + + f.logMetaEvent(ctx, fullpath, eventNotification) + +} + +func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotification *filer_pb.EventNotification) { + + dir, _ := util.FullPath(fullpath).DirAndName() + + event := &filer_pb.SubscribeMetadataResponse{ + Directory: dir, + EventNotification: eventNotification, + TsNs: time.Now().UnixNano(), + } + data, err := proto.Marshal(event) + if err != nil { + glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err) + return + } + + f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) + +} + +func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { + + startTime, stopTime = startTime.UTC(), stopTime.UTC() + + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + // startTime.Second(), startTime.Nanosecond(), + ) + + for { + if err := f.appendToFile(targetFile, buf); err != nil { + glog.V(1).Infof("log write failed %s: %v", targetFile, err) + time.Sleep(737 * time.Millisecond) + } else { + break + } + } +} + +func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { + + startTime = startTime.UTC() + startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) + startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + + sizeBuf := make([]byte, 4) + startTsNs := startTime.UnixNano() + + dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "") + if listDayErr != nil { + return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr) + } + for _, dayEntry := range dayEntries { + // println("checking day", dayEntry.FullPath) + hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "") + if listHourMinuteErr != nil { + return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) + } + for _, hourMinuteEntry := range hourMinuteEntries { + // println("checking hh-mm", hourMinuteEntry.FullPath) + if dayEntry.Name() == startDate { + if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 { + continue + } + } + // println("processing", hourMinuteEntry.FullPath) + chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks) + if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + chunkedFileReader.Close() + if err == io.EOF { + continue + } + return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) + } + chunkedFileReader.Close() + } + } + + return lastTsNs, nil +} + +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { + for { + n, err := r.Read(sizeBuf) + if err != nil { + return lastTsNs, err + } + if n != 4 { + return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n) + } + size := util.BytesToUint32(sizeBuf) + // println("entry size", size) + entryData := make([]byte, size) + n, err = r.Read(entryData) + if err != nil { + return lastTsNs, err + } + if n != int(size) { + return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size) + } + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + return lastTsNs, err + } + if logEntry.TsNs <= ns { + return lastTsNs, nil + } + // println("each log: ", logEntry.TsNs) + if err := eachLogEntryFn(logEntry); err != nil { + return lastTsNs, err + } else { + lastTsNs = logEntry.TsNs + } + } +} diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go new file mode 100644 index 000000000..b1836b046 --- /dev/null +++ b/weed/filer/filer_notify_append.go @@ -0,0 +1,73 @@ +package filer + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (f *Filer) appendToFile(targetFile string, data []byte) error { + + assignResult, uploadResult, err2 := f.assignAndUpload(data) + if err2 != nil { + return err2 + } + + // find out existing entry + fullpath := util.FullPath(targetFile) + entry, err := f.FindEntry(context.Background(), fullpath) + var offset int64 = 0 + if err == filer_pb.ErrNotFound { + entry = &Entry{ + FullPath: fullpath, + Attr: Attr{ + Crtime: time.Now(), + Mtime: time.Now(), + Mode: os.FileMode(0644), + Uid: OS_UID, + Gid: OS_GID, + }, + } + } else { + offset = int64(TotalSize(entry.Chunks)) + } + + // append to existing chunks + entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) + + // update the entry + err = f.CreateEntry(context.Background(), entry, false, false, nil) + + return err +} + +func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) { + // assign a volume location + assignRequest := &operation.VolumeAssignRequest{ + Count: 1, + Collection: f.metaLogCollection, + Replication: f.metaLogReplication, + WritableVolumeCount: 1, + } + assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) + if err != nil { + return nil, nil, fmt.Errorf("AssignVolume: %v", err) + } + if assignResult.Error != "" { + return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error) + } + + // upload data + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth) + if err != nil { + return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + // println("uploaded to", targetUrl) + return assignResult, uploadResult, nil +} diff --git a/weed/filer/filer_notify_test.go b/weed/filer/filer_notify_test.go new file mode 100644 index 000000000..6a2be8f18 --- /dev/null +++ b/weed/filer/filer_notify_test.go @@ -0,0 +1,53 @@ +package filer + +import ( + "testing" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + + "github.com/golang/protobuf/proto" +) + +func TestProtoMarshalText(t *testing.T) { + + oldEntry := &Entry{ + FullPath: util.FullPath("/this/path/to"), + Attr: Attr{ + Mtime: time.Now(), + Mode: 0644, + Uid: 1, + Mime: "text/json", + TtlSec: 25, + }, + Chunks: []*filer_pb.FileChunk{ + &filer_pb.FileChunk{ + FileId: "234,2423423422", + Offset: 234234, + Size: 234, + Mtime: 12312423, + ETag: "2342342354", + SourceFileId: "23234,2342342342", + }, + }, + } + + notification := &filer_pb.EventNotification{ + OldEntry: oldEntry.ToProtoEntry(), + NewEntry: nil, + DeleteChunks: true, + } + + text := proto.MarshalTextString(notification) + + notification2 := &filer_pb.EventNotification{} + proto.UnmarshalText(text, notification2) + + if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId { + t.Fatalf("marshal/unmarshal error: %s", text) + } + + println(text) + +} diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go new file mode 100644 index 000000000..48f7c99e4 --- /dev/null +++ b/weed/filer/filerstore.go @@ -0,0 +1,208 @@ +package filer + +import ( + "context" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type FilerStore interface { + // GetName gets the name to locate the configuration in filer.toml file + GetName() string + // Initialize initializes the file store + Initialize(configuration util.Configuration, prefix string) error + InsertEntry(context.Context, *Entry) error + UpdateEntry(context.Context, *Entry) (err error) + // err == filer2.ErrNotFound if not found + FindEntry(context.Context, util.FullPath) (entry *Entry, err error) + DeleteEntry(context.Context, util.FullPath) (err error) + DeleteFolderChildren(context.Context, util.FullPath) (err error) + ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) + ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) + + BeginTransaction(ctx context.Context) (context.Context, error) + CommitTransaction(ctx context.Context) error + RollbackTransaction(ctx context.Context) error + + Shutdown() +} + +type FilerLocalStore interface { + UpdateOffset(filer string, lastTsNs int64) error + ReadOffset(filer string) (lastTsNs int64, err error) +} + +type FilerStoreWrapper struct { + ActualStore FilerStore +} + +func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { + if innerStore, ok := store.(*FilerStoreWrapper); ok { + return innerStore + } + return &FilerStoreWrapper{ + ActualStore: store, + } +} + +func (fsw *FilerStoreWrapper) GetName() string { + return fsw.ActualStore.GetName() +} + +func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { + return fsw.ActualStore.Initialize(configuration, prefix) +} + +func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { + stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } + return fsw.ActualStore.InsertEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { + stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } + return fsw.ActualStore.UpdateEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { + stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds()) + }() + + entry, err = fsw.ActualStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + filer_pb.AfterEntryDeserialization(entry.Chunks) + return +} + +func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + return fsw.ActualStore.DeleteEntry(ctx, fp) +} + +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) + }() + + return fsw.ActualStore.DeleteFolderChildren(ctx, fp) +} + +func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { + stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds()) + }() + + entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, err +} + +func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { + stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) + }() + entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) + if err == ErrUnsupportedListDirectoryPrefixed { + entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) + } + if err != nil { + return nil, err + } + for _, entry := range entries { + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, nil +} + +func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) { + entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + + if prefix == "" { + return + } + + count := 0 + var lastFileName string + notPrefixed := entries + entries = nil + for count < limit && len(notPrefixed) > 0 { + for _, entry := range notPrefixed { + lastFileName = entry.Name() + if strings.HasPrefix(entry.Name(), prefix) { + count++ + entries = append(entries, entry) + if count >= limit { + break + } + } + } + if count < limit { + notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit) + if err != nil { + return + } + } + } + return +} + +func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { + return fsw.ActualStore.BeginTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { + return fsw.ActualStore.CommitTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { + return fsw.ActualStore.RollbackTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) Shutdown() { + fsw.ActualStore.Shutdown() +} diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go new file mode 100644 index 000000000..eccb760a2 --- /dev/null +++ b/weed/filer/leveldb/leveldb_store.go @@ -0,0 +1,231 @@ +package leveldb + +import ( + "bytes" + "context" + "fmt" + "github.com/syndtr/goleveldb/leveldb" + leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" + leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_FILE_SEPARATOR = byte(0x00) +) + +func init() { + filer.Stores = append(filer.Stores, &LevelDBStore{}) +} + +type LevelDBStore struct { + db *leveldb.DB +} + +func (store *LevelDBStore) GetName() string { + return "leveldb" +} + +func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") + return store.initialize(dir) +} + +func (store *LevelDBStore) initialize(dir string) (err error) { + glog.Infof("filer store dir: %s", dir) + if err := weed_util.TestFolderWritable(dir); err != nil { + return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) + } + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, + } + + if store.db, err = leveldb.OpenFile(dir, opts); err != nil { + if leveldb_errors.IsCorrupted(err) { + store.db, err = leveldb.RecoverFile(dir, opts) + } + if err != nil { + glog.Infof("filer store open dir %s: %v", dir, err) + return + } + } + return +} + +func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *LevelDBStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + key := genKey(entry.DirAndName()) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + err = store.db.Put(key, value, nil) + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) + + return nil +} + +func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + key := genKey(fullpath.DirAndName()) + + data, err := store.db.Get(key, nil) + + if err == leveldb.ErrNotFound { + return nil, filer_pb.ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(data) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) + + return entry, nil +} + +func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + key := genKey(fullpath.DirAndName()) + + err = store.db.Delete(key, nil) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + + batch := new(leveldb.Batch) + + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + batch.Delete([]byte(genKey(string(fullpath), fileName))) + } + iter.Release() + + err = store.db.Write(batch, nil) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer.Entry, err error) { + + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entry := &filer.Entry{ + FullPath: weed_util.NewFullPath(string(fullpath), fileName), + } + if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + iter.Release() + + return entries, err +} + +func genKey(dirPath, fileName string) (key []byte) { + key = []byte(dirPath) + key = append(key, DIR_FILE_SEPARATOR) + key = append(key, []byte(fileName)...) + return key +} + +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = []byte(string(fullpath)) + keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func getNameFromKey(key []byte) string { + + sepIndex := len(key) - 1 + for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR { + sepIndex-- + } + + return string(key[sepIndex+1:]) + +} + +func (store *LevelDBStore) Shutdown() { + store.db.Close() +} diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go new file mode 100644 index 000000000..df196b02e --- /dev/null +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -0,0 +1,88 @@ +package leveldb + +import ( + "context" + "io/ioutil" + "os" + "testing" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestCreateAndFind(t *testing.T) { + filer := filer.NewFiler(nil, nil, "", 0, "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") + defer os.RemoveAll(dir) + store := &LevelDBStore{} + store.initialize(dir) + filer.SetStore(store) + + fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") + + ctx := context.Background() + + entry1 := &filer.Entry{ + FullPath: fullpath, + Attr: filer.Attr{ + Mode: 0440, + Uid: 1234, + Gid: 5678, + }, + } + + if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil { + t.Errorf("create entry %v: %v", entry1.FullPath, err) + return + } + + entry, err := filer.FindEntry(ctx, fullpath) + + if err != nil { + t.Errorf("find entry: %v", err) + return + } + + if entry.FullPath != entry1.FullPath { + t.Errorf("find wrong entry: %v", entry.FullPath) + return + } + + // checking one upper directory + entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "") + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + + // checking one upper directory + entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} + +func TestEmptyRoot(t *testing.T) { + filer := filer.NewFiler(nil, nil, "", 0, "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") + defer os.RemoveAll(dir) + store := &LevelDBStore{} + store.initialize(dir) + filer.SetStore(store) + + ctx := context.Background() + + // checking one upper directory + entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + if err != nil { + t.Errorf("list entries: %v", err) + return + } + if len(entries) != 0 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} diff --git a/weed/filer/leveldb2/leveldb2_local_store.go b/weed/filer/leveldb2/leveldb2_local_store.go new file mode 100644 index 000000000..faae25c45 --- /dev/null +++ b/weed/filer/leveldb2/leveldb2_local_store.go @@ -0,0 +1,43 @@ +package leveldb + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + _ = filer.FilerLocalStore(&LevelDB2Store{}) +) + +func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error { + + value := make([]byte, 8) + util.Uint64toBytes(value, uint64(lastTsNs)) + + err := store.dbs[0].Put([]byte("meta"+filer), value, nil) + + if err != nil { + return fmt.Errorf("UpdateOffset %s : %v", filer, err) + } + + println("UpdateOffset", filer, "lastTsNs", lastTsNs) + + return nil +} + +func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) { + + value, err := store.dbs[0].Get([]byte("meta"+filer), nil) + + if err != nil { + return 0, fmt.Errorf("ReadOffset %s : %v", filer, err) + } + + lastTsNs = int64(util.BytesToUint64(value)) + + println("ReadOffset", filer, "lastTsNs", lastTsNs) + + return +} diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go new file mode 100644 index 000000000..7a2bdac2e --- /dev/null +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -0,0 +1,251 @@ +package leveldb + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "github.com/syndtr/goleveldb/leveldb" + leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" + leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + filer.Stores = append(filer.Stores, &LevelDB2Store{}) +} + +type LevelDB2Store struct { + dbs []*leveldb.DB + dbCount int +} + +func (store *LevelDB2Store) GetName() string { + return "leveldb2" +} + +func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") + return store.initialize(dir, 8) +} + +func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { + glog.Infof("filer store leveldb2 dir: %s", dir) + if err := weed_util.TestFolderWritable(dir); err != nil { + return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) + } + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 4, + } + + for d := 0; d < dbCount; d++ { + dbFolder := fmt.Sprintf("%s/%02d", dir, d) + os.MkdirAll(dbFolder, 0755) + db, dbErr := leveldb.OpenFile(dbFolder, opts) + if leveldb_errors.IsCorrupted(dbErr) { + db, dbErr = leveldb.RecoverFile(dbFolder, opts) + } + if dbErr != nil { + glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr) + return dbErr + } + store.dbs = append(store.dbs, db) + } + store.dbCount = dbCount + + return +} + +func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.DirAndName() + key, partitionId := genKey(dir, name, store.dbCount) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + err = store.dbs[partitionId].Put(key, value, nil) + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) + + return nil +} + +func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + key, partitionId := genKey(dir, name, store.dbCount) + + data, err := store.dbs[partitionId].Get(key, nil) + + if err == leveldb.ErrNotFound { + return nil, filer_pb.ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(data) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) + + return entry, nil +} + +func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + key, partitionId := genKey(dir, name, store.dbCount) + + err = store.dbs[partitionId].Delete(key, nil) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) + + batch := new(leveldb.Batch) + + iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + batch.Delete(append(directoryPrefix, []byte(fileName)...)) + } + iter.Release() + + err = store.dbs[partitionId].Write(batch, nil) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer.Entry, err error) { + + directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) + lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount) + + iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entry := &filer.Entry{ + FullPath: weed_util.NewFullPath(string(fullpath), fileName), + } + + // println("list", entry.FullPath, "chunks", len(entry.Chunks)) + + if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + iter.Release() + + return entries, err +} + +func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) { + key, partitionId = hashToBytes(dirPath, dbCount) + key = append(key, []byte(fileName)...) + return key, partitionId +} + +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { + keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix, partitionId +} + +func getNameFromKey(key []byte) string { + + return string(key[md5.Size:]) + +} + +// hash directory, and use last byte for partitioning +func hashToBytes(dir string, dbCount int) ([]byte, int) { + h := md5.New() + io.WriteString(h, dir) + + b := h.Sum(nil) + + x := b[len(b)-1] + + return b, int(x) % dbCount +} + +func (store *LevelDB2Store) Shutdown() { + for d := 0; d < store.dbCount; d++ { + store.dbs[d].Close() + } +} diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go new file mode 100644 index 000000000..191de0040 --- /dev/null +++ b/weed/filer/leveldb2/leveldb2_store_test.go @@ -0,0 +1,88 @@ +package leveldb + +import ( + "context" + "io/ioutil" + "os" + "testing" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestCreateAndFind(t *testing.T) { + filer := filer.NewFiler(nil, nil, "", 0, "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") + defer os.RemoveAll(dir) + store := &LevelDB2Store{} + store.initialize(dir, 2) + filer.SetStore(store) + + fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") + + ctx := context.Background() + + entry1 := &filer.Entry{ + FullPath: fullpath, + Attr: filer.Attr{ + Mode: 0440, + Uid: 1234, + Gid: 5678, + }, + } + + if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil { + t.Errorf("create entry %v: %v", entry1.FullPath, err) + return + } + + entry, err := filer.FindEntry(ctx, fullpath) + + if err != nil { + t.Errorf("find entry: %v", err) + return + } + + if entry.FullPath != entry1.FullPath { + t.Errorf("find wrong entry: %v", entry.FullPath) + return + } + + // checking one upper directory + entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "") + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + + // checking one upper directory + entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} + +func TestEmptyRoot(t *testing.T) { + filer := filer.NewFiler(nil, nil, "", 0, "", "", nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") + defer os.RemoveAll(dir) + store := &LevelDB2Store{} + store.initialize(dir, 2) + filer.SetStore(store) + + ctx := context.Background() + + // checking one upper directory + entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") + if err != nil { + t.Errorf("list entries: %v", err) + return + } + if len(entries) != 0 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go new file mode 100644 index 000000000..506f03e4c --- /dev/null +++ b/weed/filer/meta_aggregator.go @@ -0,0 +1,131 @@ +package filer + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +type MetaAggregator struct { + filers []string + grpcDialOption grpc.DialOption + MetaLogBuffer *log_buffer.LogBuffer + // notifying clients + ListenersLock sync.Mutex + ListenersCond *sync.Cond +} + +// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. +// The old data comes from what each LocalMetadata persisted on disk. +func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator { + t := &MetaAggregator{ + filers: filers, + grpcDialOption: grpcDialOption, + } + t.ListenersCond = sync.NewCond(&t.ListenersLock) + t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() { + t.ListenersCond.Broadcast() + }) + return t +} + +func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { + for _, filer := range ma.filers { + go ma.subscribeToOneFiler(f, self, filer) + } +} + +func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) { + + var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) + lastPersistTime := time.Now() + changesSinceLastPersist := 0 + lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() + + MaxChangeLimit := 100 + + if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok { + if self != filer { + + if prevTsNs, err := localStore.ReadOffset(filer); err == nil { + lastTsNs = prevTsNs + } + + glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs) + maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { + if err := Replay(f.Store.ActualStore, event); err != nil { + glog.Errorf("failed to reply metadata change from %v: %v", filer, err) + return + } + changesSinceLastPersist++ + if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) { + if err := localStore.UpdateOffset(filer, event.TsNs); err == nil { + lastPersistTime = time.Now() + changesSinceLastPersist = 0 + } else { + glog.V(0).Infof("failed to update offset for %v: %v", filer, err) + } + } + } + } else { + glog.V(0).Infof("skipping following self: %v", self) + } + } + + processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error { + data, err := proto.Marshal(event) + if err != nil { + glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err) + return err + } + dir := event.Directory + // println("received meta change", dir, "size", len(data)) + ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) + if maybeReplicateMetadataChange != nil { + maybeReplicateMetadataChange(event) + } + return nil + } + + for { + err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ + ClientName: "filer:" + self, + PathPrefix: "/", + SinceNs: lastTsNs, + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + return fmt.Errorf("process %v: %v", resp, err) + } + lastTsNs = resp.TsNs + } + }) + if err != nil { + glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err) + time.Sleep(1733 * time.Millisecond) + } + } +} diff --git a/weed/filer/meta_replay.go b/weed/filer/meta_replay.go new file mode 100644 index 000000000..feb76278b --- /dev/null +++ b/weed/filer/meta_replay.go @@ -0,0 +1,37 @@ +package filer + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + var oldPath util.FullPath + var newEntry *Entry + if message.OldEntry != nil { + oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) + glog.V(4).Infof("deleting %v", oldPath) + if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil { + return err + } + } + + if message.NewEntry != nil { + dir := resp.Directory + if message.NewParentPath != "" { + dir = message.NewParentPath + } + key := util.NewFullPath(dir, message.NewEntry.Name) + glog.V(4).Infof("creating %v", key) + newEntry = FromPbEntry(dir, message.NewEntry) + if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil { + return err + } + } + + return nil +} diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go new file mode 100644 index 000000000..57d9a031d --- /dev/null +++ b/weed/filer/mongodb/mongodb_store.go @@ -0,0 +1,214 @@ +package mongodb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx" + "time" +) + +func init() { + filer.Stores = append(filer.Stores, &MongodbStore{}) +} + +type MongodbStore struct { + connect *mongo.Client + database string + collectionName string +} + +type Model struct { + Directory string `bson:"directory"` + Name string `bson:"name"` + Meta []byte `bson:"meta"` +} + +func (store *MongodbStore) GetName() string { + return "mongodb" +} + +func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + store.database = configuration.GetString(prefix + "database") + store.collectionName = "filemeta" + poolSize := configuration.GetInt(prefix + "option_pool_size") + return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize)) +} + +func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + opts := options.Client().ApplyURI(uri) + + if poolSize > 0 { + opts.SetMaxPoolSize(poolSize) + } + + client, err := mongo.Connect(ctx, opts) + if err != nil { + return err + } + + c := client.Database(store.database).Collection(store.collectionName) + err = store.indexUnique(c) + store.connect = client + return err +} + +func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error { + _, err := c.Indexes().CreateOne(context.Background(), index, opts) + return err +} + +func (store *MongodbStore) indexUnique(c *mongo.Collection) error { + opts := options.CreateIndexes().SetMaxTime(10 * time.Second) + + unique := new(bool) + *unique = true + + index := mongo.IndexModel{ + Keys: bsonx.Doc{{Key: "directory", Value: bsonx.Int32(1)}, {Key: "name", Value: bsonx.Int32(1)}}, + Options: &options.IndexOptions{ + Unique: unique, + }, + } + + return store.createIndex(c, index, opts) +} + +func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (store *MongodbStore) CommitTransaction(ctx context.Context) error { + return nil +} + +func (store *MongodbStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + c := store.connect.Database(store.database).Collection(store.collectionName) + + _, err = c.InsertOne(ctx, Model{ + Directory: dir, + Name: name, + Meta: meta, + }) + + return nil +} + +func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + dir, name := fullpath.DirAndName() + var data Model + + var where = bson.M{"directory": dir, "name": name} + err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data) + if err != mongo.ErrNoDocuments && err != nil { + return nil, filer_pb.ErrNotFound + } + + if len(data.Meta) == 0 { + return nil, filer_pb.ErrNotFound + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + + err = entry.DecodeAttributesAndChunks(data.Meta) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { + + dir, name := fullpath.DirAndName() + + where := bson.M{"directory": dir, "name": name} + _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + + where := bson.M{"directory": fullpath} + _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { + + var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}} + if inclusive { + where["name"] = bson.M{ + "$gte": startFileName, + } + } + optLimit := int64(limit) + opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}} + cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts) + for cur.Next(ctx) { + var data Model + err := cur.Decode(&data) + if err != nil && err != mongo.ErrNoDocuments { + return nil, err + } + + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(fullpath), data.Name), + } + if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + + entries = append(entries, entry) + } + + if err := cur.Close(ctx); err != nil { + glog.V(0).Infof("list iterator close: %v", err) + } + + return entries, err +} + +func (store *MongodbStore) Shutdown() { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + store.connect.Disconnect(ctx) +} diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go new file mode 100644 index 000000000..708a67cc3 --- /dev/null +++ b/weed/filer/mysql/mysql_store.go @@ -0,0 +1,74 @@ +package mysql + +import ( + "database/sql" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/chrislusf/seaweedfs/weed/util" + _ "github.com/go-sql-driver/mysql" +) + +const ( + CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8" +) + +func init() { + filer.Stores = append(filer.Stores, &MysqlStore{}) +} + +type MysqlStore struct { + abstract_sql.AbstractSqlStore +} + +func (store *MysqlStore) GetName() string { + return "mysql" +} + +func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), + configuration.GetBool(prefix+"interpolateParams"), + ) +} + +func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int, + interpolateParams bool) (err error) { + // + store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" + store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" + store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" + store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" + store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" + store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?" + store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?" + + sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) + if interpolateParams { + sqlUrl += "&interpolateParams=true" + } + + var dbErr error + store.DB, dbErr = sql.Open("mysql", sqlUrl) + if dbErr != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + } + + store.DB.SetMaxIdleConns(maxIdle) + store.DB.SetMaxOpenConns(maxOpen) + + if err = store.DB.Ping(); err != nil { + return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + } + + return nil +} diff --git a/weed/filer/permission.go b/weed/filer/permission.go new file mode 100644 index 000000000..0d8b8292b --- /dev/null +++ b/weed/filer/permission.go @@ -0,0 +1,22 @@ +package filer + +func hasWritePermission(dir *Entry, entry *Entry) bool { + + if dir == nil { + return false + } + + if dir.Uid == entry.Uid && dir.Mode&0200 > 0 { + return true + } + + if dir.Gid == entry.Gid && dir.Mode&0020 > 0 { + return true + } + + if dir.Mode&0002 > 0 { + return true + } + + return false +} diff --git a/weed/filer/postgres/README.txt b/weed/filer/postgres/README.txt new file mode 100644 index 000000000..cb0c99c63 --- /dev/null +++ b/weed/filer/postgres/README.txt @@ -0,0 +1,17 @@ + +1. create "seaweedfs" database + +export PGHOME=/Library/PostgreSQL/10 +$PGHOME/bin/createdb --username=postgres --password seaweedfs + +2. create "filemeta" table +$PGHOME/bin/psql --username=postgres --password seaweedfs + +CREATE TABLE IF NOT EXISTS filemeta ( + dirhash BIGINT, + name VARCHAR(65535), + directory VARCHAR(65535), + meta bytea, + PRIMARY KEY (dirhash, name) +); + diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go new file mode 100644 index 000000000..4544c8416 --- /dev/null +++ b/weed/filer/postgres/postgres_store.go @@ -0,0 +1,75 @@ +package postgres + +import ( + "database/sql" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/chrislusf/seaweedfs/weed/util" + _ "github.com/lib/pq" +) + +const ( + CONNECTION_URL_PATTERN = "host=%s port=%d user=%s sslmode=%s connect_timeout=30" +) + +func init() { + filer.Stores = append(filer.Stores, &PostgresStore{}) +} + +type PostgresStore struct { + abstract_sql.AbstractSqlStore +} + +func (store *PostgresStore) GetName() string { + return "postgres" +} + +func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetString(prefix+"sslmode"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), + ) +} + +func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) { + + store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)" + store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" + store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" + store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" + store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" + store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like CONCAT($4,'%')ORDER BY NAME ASC LIMIT $5" + store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like CONCAT($4,'%') ORDER BY NAME ASC LIMIT $5" + + sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode) + if password != "" { + sqlUrl += " password=" + password + } + if database != "" { + sqlUrl += " dbname=" + database + } + var dbErr error + store.DB, dbErr = sql.Open("postgres", sqlUrl) + if dbErr != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + } + + store.DB.SetMaxIdleConns(maxIdle) + store.DB.SetMaxOpenConns(maxOpen) + + if err = store.DB.Ping(); err != nil { + return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + } + + return nil +} diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go new file mode 100644 index 000000000..9f338782e --- /dev/null +++ b/weed/filer/reader_at.go @@ -0,0 +1,149 @@ +package filer + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +type ChunkReadAt struct { + masterClient *wdclient.MasterClient + chunkViews []*ChunkView + lookupFileId func(fileId string) (targetUrl string, err error) + readerLock sync.Mutex + fileSize int64 + + chunkCache chunk_cache.ChunkCache +} + +// var _ = io.ReaderAt(&ChunkReadAt{}) + +type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) + +func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { + return func(fileId string) (targetUrl string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + vid := VolumeId(fileId) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations := resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + + targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + + return nil + }) + return + } +} + +func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { + + return &ChunkReadAt{ + chunkViews: chunkViews, + lookupFileId: LookupFn(filerClient), + chunkCache: chunkCache, + fileSize: fileSize, + } +} + +func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { + + c.readerLock.Lock() + defer c.readerLock.Unlock() + + glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) + return c.doReadAt(p[n:], offset+int64(n)) +} + +func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { + + var buffer []byte + startOffset, remaining := offset, int64(len(p)) + for i, chunk := range c.chunkViews { + if remaining <= 0 { + break + } + if startOffset < chunk.LogicOffset { + gap := int(chunk.LogicOffset - startOffset) + glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) + n += int(min(int64(gap), remaining)) + startOffset, remaining = chunk.LogicOffset, remaining-int64(gap) + if remaining <= 0 { + break + } + } + // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) + chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining) + if chunkStart >= chunkStop { + continue + } + glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) + buffer, err = c.readFromWholeChunkData(chunk) + if err != nil { + glog.Errorf("fetching chunk %+v: %v\n", chunk, err) + return + } + bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset + copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart]) + n += copied + startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) + } + + glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) + + if err == nil && remaining > 0 && c.fileSize > startOffset { + delta := int(min(remaining, c.fileSize-startOffset)) + glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize) + n += delta + } + + if err == nil && offset+int64(len(p)) > c.fileSize { + err = io.EOF + } + // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err) + + return + +} + +func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { + + glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) + + chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + if chunkData != nil { + glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) + } else { + glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId) + chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) + if err != nil { + return + } + c.chunkCache.SetChunk(chunkView.FileId, chunkData) + } + + return +} + +func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { + + return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped) + +} diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go new file mode 100644 index 000000000..d4a34cbfe --- /dev/null +++ b/weed/filer/reader_at_test.go @@ -0,0 +1,156 @@ +package filer + +import ( + "fmt" + "io" + "math" + "strconv" + "sync" + "testing" +) + +type mockChunkCache struct { +} + +func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { + x, _ := strconv.Atoi(fileId) + data = make([]byte, minSize) + for i := 0; i < int(minSize); i++ { + data[i] = byte(x) + } + return data +} +func (m *mockChunkCache) SetChunk(fileId string, data []byte) { +} + +func TestReaderAt(t *testing.T) { + + visibles := []VisibleInterval{ + { + start: 1, + stop: 2, + fileId: "1", + chunkSize: 9, + }, + { + start: 3, + stop: 4, + fileId: "3", + chunkSize: 1, + }, + { + start: 5, + stop: 6, + fileId: "5", + chunkSize: 2, + }, + { + start: 7, + stop: 9, + fileId: "7", + chunkSize: 2, + }, + { + start: 9, + stop: 10, + fileId: "9", + chunkSize: 2, + }, + } + + readerAt := &ChunkReadAt{ + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 10, + chunkCache: &mockChunkCache{}, + } + + testReadAt(t, readerAt, 0, 10, 10, nil) + testReadAt(t, readerAt, 0, 12, 10, io.EOF) + testReadAt(t, readerAt, 2, 8, 8, nil) + testReadAt(t, readerAt, 3, 6, 6, nil) + +} + +func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) { + data := make([]byte, size) + n, err := readerAt.ReadAt(data, offset) + + for _, d := range data { + fmt.Printf("%x", d) + } + fmt.Println() + + if expected != n { + t.Errorf("unexpected read size: %d, expect: %d", n, expected) + } + if err != expectedErr { + t.Errorf("unexpected read error: %v, expect: %v", err, expectedErr) + } + +} + +func TestReaderAt0(t *testing.T) { + + visibles := []VisibleInterval{ + { + start: 2, + stop: 5, + fileId: "1", + chunkSize: 9, + }, + { + start: 7, + stop: 9, + fileId: "2", + chunkSize: 9, + }, + } + + readerAt := &ChunkReadAt{ + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 10, + chunkCache: &mockChunkCache{}, + } + + testReadAt(t, readerAt, 0, 10, 10, nil) + testReadAt(t, readerAt, 3, 16, 7, io.EOF) + testReadAt(t, readerAt, 3, 5, 5, nil) + + testReadAt(t, readerAt, 11, 5, 0, io.EOF) + testReadAt(t, readerAt, 10, 5, 0, io.EOF) + +} + +func TestReaderAt1(t *testing.T) { + + visibles := []VisibleInterval{ + { + start: 2, + stop: 5, + fileId: "1", + chunkSize: 9, + }, + } + + readerAt := &ChunkReadAt{ + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 20, + chunkCache: &mockChunkCache{}, + } + + testReadAt(t, readerAt, 0, 20, 20, nil) + testReadAt(t, readerAt, 1, 7, 7, nil) + testReadAt(t, readerAt, 0, 1, 1, nil) + testReadAt(t, readerAt, 18, 4, 2, io.EOF) + testReadAt(t, readerAt, 12, 4, 4, nil) + testReadAt(t, readerAt, 4, 20, 16, io.EOF) + testReadAt(t, readerAt, 4, 10, 10, nil) + testReadAt(t, readerAt, 1, 10, 10, nil) + +} diff --git a/weed/filer/redis/redis_cluster_store.go b/weed/filer/redis/redis_cluster_store.go new file mode 100644 index 000000000..8af94ee55 --- /dev/null +++ b/weed/filer/redis/redis_cluster_store.go @@ -0,0 +1,42 @@ +package redis + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisClusterStore{}) +} + +type RedisClusterStore struct { + UniversalRedisStore +} + +func (store *RedisClusterStore) GetName() string { + return "redis_cluster" +} + +func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", true) + configuration.SetDefault(prefix+"routeByLatency", true) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + ) +} + +func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + return +} diff --git a/weed/filer/redis/redis_store.go b/weed/filer/redis/redis_store.go new file mode 100644 index 000000000..e152457ed --- /dev/null +++ b/weed/filer/redis/redis_store.go @@ -0,0 +1,36 @@ +package redis + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisStore{}) +} + +type RedisStore struct { + UniversalRedisStore +} + +func (store *RedisStore) GetName() string { + return "redis" +} + +func (store *RedisStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *RedisStore) initialize(hostPort string, password string, database int) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + return +} diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go new file mode 100644 index 000000000..cc8819019 --- /dev/null +++ b/weed/filer/redis/universal_redis_store.go @@ -0,0 +1,191 @@ +package redis + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/go-redis/redis" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedisStore struct { + Client redis.UniversalClient +} + +func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result() + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + dir, name := entry.FullPath.DirAndName() + if name != "" { + _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result() + if err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + data, err := store.Client.Get(string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks([]byte(data)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + _, err = store.Client.Del(string(fullpath)).Result() + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + dir, name := fullpath.DirAndName() + if name != "" { + _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() + if err != nil { + return fmt.Errorf("delete folder %s : %v", fullpath, err) + } + + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + _, err = store.Client.Del(string(path)).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer.Entry, err error) { + + dirListKey := genDirectoryListKey(string(fullpath)) + members, err := store.Client.SMembers(dirListKey).Result() + if err != nil { + return nil, fmt.Errorf("list %s : %v", fullpath, err) + } + + // skip + if startFileName != "" { + var t []string + for _, m := range members { + if strings.Compare(m, startFileName) >= 0 { + if m == startFileName { + if inclusive { + t = append(t, m) + } + } else { + t = append(t, m) + } + } + } + members = t + } + + // sort + sort.Slice(members, func(i, j int) bool { + return strings.Compare(members[i], members[j]) < 0 + }) + + // limit + if limit < len(members) { + members = members[:limit] + } + + // fetch entry meta + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + entry, err := store.FindEntry(ctx, path) + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(string(path)).Result() + store.Client.SRem(dirListKey, fileName).Result() + continue + } + } + entries = append(entries, entry) + } + } + + return entries, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedisStore) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go new file mode 100644 index 000000000..d155dbe88 --- /dev/null +++ b/weed/filer/redis2/redis_cluster_store.go @@ -0,0 +1,42 @@ +package redis2 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisCluster2Store{}) +} + +type RedisCluster2Store struct { + UniversalRedis2Store +} + +func (store *RedisCluster2Store) GetName() string { + return "redis_cluster2" +} + +func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", true) + configuration.SetDefault(prefix+"routeByLatency", true) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + ) +} + +func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + return +} diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go new file mode 100644 index 000000000..ed04c817b --- /dev/null +++ b/weed/filer/redis2/redis_store.go @@ -0,0 +1,36 @@ +package redis2 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis" +) + +func init() { + filer.Stores = append(filer.Stores, &Redis2Store{}) +} + +type Redis2Store struct { + UniversalRedis2Store +} + +func (store *Redis2Store) GetName() string { + return "redis2" +} + +func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + return +} diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go new file mode 100644 index 000000000..9e06ff68f --- /dev/null +++ b/weed/filer/redis2/universal_redis_store.go @@ -0,0 +1,166 @@ +package redis2 + +import ( + "context" + "fmt" + "time" + + "github.com/go-redis/redis" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedis2Store struct { + Client redis.UniversalClient +} + +func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + dir, name := entry.FullPath.DirAndName() + if name != "" { + if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + data, err := store.Client.Get(string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks([]byte(data)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + _, err = store.Client.Del(string(fullpath)).Result() + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + dir, name := fullpath.DirAndName() + if name != "" { + _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() + if err != nil { + return fmt.Errorf("delete folder %s : %v", fullpath, err) + } + + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + _, err = store.Client.Del(string(path)).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer.Entry, err error) { + + dirListKey := genDirectoryListKey(string(fullpath)) + start := int64(0) + if startFileName != "" { + start, _ = store.Client.ZRank(dirListKey, startFileName).Result() + if !inclusive { + start++ + } + } + members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result() + if err != nil { + return nil, fmt.Errorf("list %s : %v", fullpath, err) + } + + // fetch entry meta + for _, fileName := range members { + path := util.NewFullPath(string(fullpath), fileName) + entry, err := store.FindEntry(ctx, path) + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(string(path)).Result() + store.Client.ZRem(dirListKey, fileName).Result() + continue + } + } + entries = append(entries, entry) + } + } + + return entries, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedis2Store) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/stream.go b/weed/filer/stream.go new file mode 100644 index 000000000..416359ebf --- /dev/null +++ b/weed/filer/stream.go @@ -0,0 +1,204 @@ +package filer + +import ( + "bytes" + "io" + "math" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { + + // fmt.Printf("start to stream content for chunks: %+v\n", chunks) + chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) + + fileId2Url := make(map[string]string) + + for _, chunkView := range chunkViews { + + urlString, err := masterClient.LookupFileId(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return err + } + fileId2Url[chunkView.FileId] = urlString + } + + for _, chunkView := range chunkViews { + + urlString := fileId2Url[chunkView.FileId] + err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + w.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return err + } + } + + return nil + +} + +// ---------------- ReadAllReader ---------------------------------- + +func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { + + buffer := bytes.Buffer{} + + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + } + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + + for _, chunkView := range chunkViews { + urlString, err := lookupFileIdFn(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return nil, err + } + err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return nil, err + } + } + return buffer.Bytes(), nil +} + +// ---------------- ChunkStreamReader ---------------------------------- +type ChunkStreamReader struct { + chunkViews []*ChunkView + logicOffset int64 + buffer []byte + bufferOffset int64 + bufferPos int + chunkIndex int + lookupFileId LookupFileIdFunctionType +} + +var _ = io.ReadSeeker(&ChunkStreamReader{}) + +func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + } + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, + } +} + +func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + lookupFileIdFn := LookupFn(filerClient) + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, + } +} + +func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { + for n < len(p) { + if c.isBufferEmpty() { + if c.chunkIndex >= len(c.chunkViews) { + return n, io.EOF + } + chunkView := c.chunkViews[c.chunkIndex] + c.fetchChunkToBuffer(chunkView) + c.chunkIndex++ + } + t := copy(p[n:], c.buffer[c.bufferPos:]) + c.bufferPos += t + n += t + } + return +} + +func (c *ChunkStreamReader) isBufferEmpty() bool { + return len(c.buffer) <= c.bufferPos +} + +func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { + + var totalSize int64 + for _, chunk := range c.chunkViews { + totalSize += int64(chunk.Size) + } + + var err error + switch whence { + case io.SeekStart: + case io.SeekCurrent: + offset += c.bufferOffset + int64(c.bufferPos) + case io.SeekEnd: + offset = totalSize + offset + } + if offset > totalSize { + err = io.ErrUnexpectedEOF + } + + for i, chunk := range c.chunkViews { + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + c.fetchChunkToBuffer(chunk) + c.chunkIndex = i + 1 + break + } + } + } + c.bufferPos = int(offset - c.bufferOffset) + + return offset, err + +} + +func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { + urlString, err := c.lookupFileId(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return err + } + var buffer bytes.Buffer + err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return err + } + c.buffer = buffer.Bytes() + c.bufferPos = 0 + c.bufferOffset = chunkView.LogicOffset + + // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + + return nil +} + +func (c *ChunkStreamReader) Close() { + // TODO try to release and reuse buffer +} + +func VolumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} diff --git a/weed/filer/topics.go b/weed/filer/topics.go new file mode 100644 index 000000000..3a2fde8c4 --- /dev/null +++ b/weed/filer/topics.go @@ -0,0 +1,6 @@ +package filer + +const ( + TopicsDir = "/topics" + SystemLogDir = TopicsDir + "/.system/log" +) |
