diff options
Diffstat (limited to 'weed/filer2')
42 files changed, 0 insertions, 5453 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go deleted file mode 100644 index ce6377aac..000000000 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ /dev/null @@ -1,206 +0,0 @@ -package abstract_sql - -import ( - "context" - "database/sql" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type AbstractSqlStore struct { - DB *sql.DB - SqlInsert string - SqlUpdate string - SqlFind string - SqlDelete string - SqlDeleteFolderChildren string - SqlListExclusive string - SqlListInclusive string -} - -type TxOrDB interface { - ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) - QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row - QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) -} - -func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) { - tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{ - Isolation: sql.LevelReadCommitted, - ReadOnly: false, - }) - if err != nil { - return ctx, err - } - - return context.WithValue(ctx, "tx", tx), nil -} -func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error { - if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx.Commit() - } - return nil -} -func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { - if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx.Rollback() - } - return nil -} - -func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB { - if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx - } - return store.DB -} - -func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - dir, name := entry.FullPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta) - if err != nil { - return fmt.Errorf("insert %s: %s", entry.FullPath, err) - } - - affectedRows, err := res.RowsAffected() - if err == nil && affectedRows > 0 { - return nil - } - - // now the insert failed possibly due to duplication constraints - glog.V(1).Infof("insert %s falls back to update: %s", entry.FullPath, err) - - res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) - if err != nil { - return fmt.Errorf("upsert %s: %s", entry.FullPath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err) - } - return nil - -} - -func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - dir, name := entry.FullPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) - if err != nil { - return fmt.Errorf("update %s: %s", entry.FullPath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err) - } - return nil -} - -func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer2.Entry, error) { - - dir, name := fullpath.DirAndName() - row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir) - var data []byte - if err := row.Scan(&data); err != nil { - return nil, filer_pb.ErrNotFound - } - - entry := &filer2.Entry{ - FullPath: fullpath, - } - if err := entry.DecodeAttributesAndChunks(data); err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - - dir, name := fullpath.DirAndName() - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir) - if err != nil { - return fmt.Errorf("delete %s: %s", fullpath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) - } - - return nil -} - -func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath) - if err != nil { - return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) - } - - return nil -} - -func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - sqlText := store.SqlListExclusive - if inclusive { - sqlText = store.SqlListInclusive - } - - rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit) - if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) - } - defer rows.Close() - - for rows.Next() { - var name string - var data []byte - if err = rows.Scan(&name, &data); err != nil { - glog.V(0).Infof("scan %s : %v", fullpath, err) - return nil, fmt.Errorf("scan %s: %v", fullpath, err) - } - - entry := &filer2.Entry{ - FullPath: util.NewFullPath(string(fullpath), name), - } - if err = entry.DecodeAttributesAndChunks(data); err != nil { - glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) - return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) - } - - entries = append(entries, entry) - } - - return entries, nil -} - -func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "") -} - -func (store *AbstractSqlStore) Shutdown() { - store.DB.Close() -} diff --git a/weed/filer2/cassandra/README.txt b/weed/filer2/cassandra/README.txt deleted file mode 100644 index 122c9c3f4..000000000 --- a/weed/filer2/cassandra/README.txt +++ /dev/null @@ -1,14 +0,0 @@ -1. create a keyspace - -CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1}; - -2. create filemeta table - - USE seaweedfs; - - CREATE TABLE filemeta ( - directory varchar, - name varchar, - meta blob, - PRIMARY KEY (directory, name) - ) WITH CLUSTERING ORDER BY (name ASC); diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go deleted file mode 100644 index 4d845c2fa..000000000 --- a/weed/filer2/cassandra/cassandra_store.go +++ /dev/null @@ -1,163 +0,0 @@ -package cassandra - -import ( - "context" - "fmt" - "github.com/gocql/gocql" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func init() { - filer2.Stores = append(filer2.Stores, &CassandraStore{}) -} - -type CassandraStore struct { - cluster *gocql.ClusterConfig - session *gocql.Session -} - -func (store *CassandraStore) GetName() string { - return "cassandra" -} - -func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) { - return store.initialize( - configuration.GetString(prefix+"keyspace"), - configuration.GetStringSlice(prefix+"hosts"), - ) -} - -func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) { - store.cluster = gocql.NewCluster(hosts...) - store.cluster.Keyspace = keyspace - store.cluster.Consistency = gocql.LocalQuorum - store.session, err = store.cluster.CreateSession() - if err != nil { - glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) - } - return -} - -func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *CassandraStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *CassandraStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - dir, name := entry.FullPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } - - if err := store.session.Query( - "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ", - dir, name, meta, entry.TtlSec).Exec(); err != nil { - return fmt.Errorf("insert %s: %s", entry.FullPath, err) - } - - return nil -} - -func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { - - dir, name := fullpath.DirAndName() - var data []byte - if err := store.session.Query( - "SELECT meta FROM filemeta WHERE directory=? AND name=?", - dir, name).Consistency(gocql.One).Scan(&data); err != nil { - if err != gocql.ErrNotFound { - return nil, filer_pb.ErrNotFound - } - } - - if len(data) == 0 { - return nil, filer_pb.ErrNotFound - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - - dir, name := fullpath.DirAndName() - - if err := store.session.Query( - "DELETE FROM filemeta WHERE directory=? AND name=?", - dir, name).Exec(); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - - if err := store.session.Query( - "DELETE FROM filemeta WHERE directory=?", - fullpath).Exec(); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - return nil, filer2.ErrUnsupportedListDirectoryPrefixed -} - -func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" - if inclusive { - cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" - } - - var data []byte - var name string - iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter() - for iter.Scan(&name, &data) { - entry := &filer2.Entry{ - FullPath: util.NewFullPath(string(fullpath), name), - } - if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - if err := iter.Close(); err != nil { - glog.V(0).Infof("list iterator close: %v", err) - } - - return entries, err -} - -func (store *CassandraStore) Shutdown() { - store.session.Close() -} diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go deleted file mode 100644 index a174117ea..000000000 --- a/weed/filer2/configuration.go +++ /dev/null @@ -1,50 +0,0 @@ -package filer2 - -import ( - "os" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/spf13/viper" -) - -var ( - Stores []FilerStore -) - -func (f *Filer) LoadConfiguration(config *viper.Viper) { - - validateOneEnabledStore(config) - - for _, store := range Stores { - if config.GetBool(store.GetName() + ".enabled") { - if err := store.Initialize(config, store.GetName()+"."); err != nil { - glog.Fatalf("Failed to initialize store for %s: %+v", - store.GetName(), err) - } - f.SetStore(store) - glog.V(0).Infof("Configure filer for %s", store.GetName()) - return - } - } - - println() - println("Supported filer stores are:") - for _, store := range Stores { - println(" " + store.GetName()) - } - - os.Exit(-1) -} - -func validateOneEnabledStore(config *viper.Viper) { - enabledStore := "" - for _, store := range Stores { - if config.GetBool(store.GetName() + ".enabled") { - if enabledStore == "" { - enabledStore = store.GetName() - } else { - glog.Fatalf("Filer store is enabled for both %s and %s", enabledStore, store.GetName()) - } - } - } -} diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go deleted file mode 100644 index fedfde40d..000000000 --- a/weed/filer2/entry.go +++ /dev/null @@ -1,91 +0,0 @@ -package filer2 - -import ( - "os" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type Attr struct { - Mtime time.Time // time of last modification - Crtime time.Time // time of creation (OS X only) - Mode os.FileMode // file mode - Uid uint32 // owner uid - Gid uint32 // group gid - Mime string // mime type - Replication string // replication - Collection string // collection name - TtlSec int32 // ttl in seconds - UserName string - GroupNames []string - SymlinkTarget string - Md5 []byte - FileSize uint64 -} - -func (attr Attr) IsDirectory() bool { - return attr.Mode&os.ModeDir > 0 -} - -type Entry struct { - util.FullPath - - Attr - Extended map[string][]byte - - // the following is for files - Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` -} - -func (entry *Entry) Size() uint64 { - return maxUint64(TotalSize(entry.Chunks), entry.FileSize) -} - -func (entry *Entry) Timestamp() time.Time { - if entry.IsDirectory() { - return entry.Crtime - } else { - return entry.Mtime - } -} - -func (entry *Entry) ToProtoEntry() *filer_pb.Entry { - if entry == nil { - return nil - } - return &filer_pb.Entry{ - Name: entry.FullPath.Name(), - IsDirectory: entry.IsDirectory(), - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - } -} - -func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { - if entry == nil { - return nil - } - dir, _ := entry.FullPath.DirAndName() - return &filer_pb.FullEntry{ - Dir: dir, - Entry: entry.ToProtoEntry(), - } -} - -func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { - return &Entry{ - FullPath: util.NewFullPath(dir, entry.Name), - Attr: PbToEntryAttribute(entry.Attributes), - Chunks: entry.Chunks, - } -} - -func maxUint64(x, y uint64) uint64 { - if x > y { - return x - } - return y -} diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go deleted file mode 100644 index 4d615194f..000000000 --- a/weed/filer2/entry_codec.go +++ /dev/null @@ -1,124 +0,0 @@ -package filer2 - -import ( - "bytes" - "fmt" - "os" - "time" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { - message := &filer_pb.Entry{ - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - } - return proto.Marshal(message) -} - -func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { - - message := &filer_pb.Entry{} - - if err := proto.UnmarshalMerge(blob, message); err != nil { - return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err) - } - - entry.Attr = PbToEntryAttribute(message.Attributes) - - entry.Extended = message.Extended - - entry.Chunks = message.Chunks - - return nil -} - -func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { - - return &filer_pb.FuseAttributes{ - Crtime: entry.Attr.Crtime.Unix(), - Mtime: entry.Attr.Mtime.Unix(), - FileMode: uint32(entry.Attr.Mode), - Uid: entry.Uid, - Gid: entry.Gid, - Mime: entry.Mime, - Collection: entry.Attr.Collection, - Replication: entry.Attr.Replication, - TtlSec: entry.Attr.TtlSec, - UserName: entry.Attr.UserName, - GroupName: entry.Attr.GroupNames, - SymlinkTarget: entry.Attr.SymlinkTarget, - Md5: entry.Attr.Md5, - FileSize: entry.Attr.FileSize, - } -} - -func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { - - t := Attr{} - - t.Crtime = time.Unix(attr.Crtime, 0) - t.Mtime = time.Unix(attr.Mtime, 0) - t.Mode = os.FileMode(attr.FileMode) - t.Uid = attr.Uid - t.Gid = attr.Gid - t.Mime = attr.Mime - t.Collection = attr.Collection - t.Replication = attr.Replication - t.TtlSec = attr.TtlSec - t.UserName = attr.UserName - t.GroupNames = attr.GroupName - t.SymlinkTarget = attr.SymlinkTarget - t.Md5 = attr.Md5 - t.FileSize = attr.FileSize - - return t -} - -func EqualEntry(a, b *Entry) bool { - if a == b { - return true - } - if a == nil && b != nil || a != nil && b == nil { - return false - } - if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) { - return false - } - if len(a.Chunks) != len(b.Chunks) { - return false - } - - if !eq(a.Extended, b.Extended) { - return false - } - - if !bytes.Equal(a.Md5, b.Md5) { - return false - } - - for i := 0; i < len(a.Chunks); i++ { - if !proto.Equal(a.Chunks[i], b.Chunks[i]) { - return false - } - } - return true -} - -func eq(a, b map[string][]byte) bool { - if len(a) != len(b) { - return false - } - - for k, v := range a { - if w, ok := b[k]; !ok || !bytes.Equal(v, w) { - return false - } - } - - return true -} diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go deleted file mode 100644 index 6f4c3ce5c..000000000 --- a/weed/filer2/etcd/etcd_store.go +++ /dev/null @@ -1,204 +0,0 @@ -package etcd - -import ( - "context" - "fmt" - "strings" - "time" - - "go.etcd.io/etcd/clientv3" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - weed_util "github.com/chrislusf/seaweedfs/weed/util" -) - -const ( - DIR_FILE_SEPARATOR = byte(0x00) -) - -func init() { - filer2.Stores = append(filer2.Stores, &EtcdStore{}) -} - -type EtcdStore struct { - client *clientv3.Client -} - -func (store *EtcdStore) GetName() string { - return "etcd" -} - -func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - servers := configuration.GetString(prefix + "servers") - if servers == "" { - servers = "localhost:2379" - } - - timeout := configuration.GetString(prefix + "timeout") - if timeout == "" { - timeout = "3s" - } - - return store.initialize(servers, timeout) -} - -func (store *EtcdStore) initialize(servers string, timeout string) (err error) { - glog.Infof("filer store etcd: %s", servers) - - to, err := time.ParseDuration(timeout) - if err != nil { - return fmt.Errorf("parse timeout %s: %s", timeout, err) - } - - store.client, err = clientv3.New(clientv3.Config{ - Endpoints: strings.Split(servers, ","), - DialTimeout: to, - }) - if err != nil { - return fmt.Errorf("connect to etcd %s: %s", servers, err) - } - - return -} - -func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *EtcdStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *EtcdStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - key := genKey(entry.DirAndName()) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - if _, err := store.client.Put(ctx, string(key), string(value)); err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - return nil -} - -func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(ctx, entry) -} - -func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { - key := genKey(fullpath.DirAndName()) - - resp, err := store.client.Get(ctx, string(key)) - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - if len(resp.Kvs) == 0 { - return nil, filer_pb.ErrNotFound - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { - key := genKey(fullpath.DirAndName()) - - if _, err := store.client.Delete(ctx, string(key)); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { - return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err) - } - - return nil -} - -func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - return nil, filer2.ErrUnsupportedListDirectoryPrefixed -} - -func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - resp, err := store.client.Get(ctx, string(directoryPrefix), - clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) - if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) - } - - for _, kv := range resp.Kvs { - fileName := getNameFromKey(kv.Key) - if fileName == "" { - continue - } - if fileName == startFileName && !inclusive { - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: weed_util.NewFullPath(string(fullpath), fileName), - } - if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - - return entries, err -} - -func genKey(dirPath, fileName string) (key []byte) { - key = []byte(dirPath) - key = append(key, DIR_FILE_SEPARATOR) - key = append(key, []byte(fileName)...) - return key -} - -func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { - keyPrefix = []byte(string(fullpath)) - keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix -} - -func getNameFromKey(key []byte) string { - sepIndex := len(key) - 1 - for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR { - sepIndex-- - } - - return string(key[sepIndex+1:]) -} - -func (store *EtcdStore) Shutdown() { - store.client.Close() -} diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go deleted file mode 100644 index ba4625bab..000000000 --- a/weed/filer2/filechunk_manifest.go +++ /dev/null @@ -1,168 +0,0 @@ -package filer2 - -import ( - "bytes" - "fmt" - "io" - "math" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -const ( - ManifestBatch = 1000 -) - -func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { - for _, chunk := range chunks { - if chunk.IsChunkManifest { - return true - } - } - return false -} - -func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) { - for _, c := range chunks { - if c.IsChunkManifest { - manifestChunks = append(manifestChunks, c) - } else { - nonManifestChunks = append(nonManifestChunks, c) - } - } - return -} - -func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { - // TODO maybe parallel this - for _, chunk := range chunks { - if !chunk.IsChunkManifest { - dataChunks = append(dataChunks, chunk) - continue - } - - resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) - if err != nil { - return chunks, nil, err - } - - manifestChunks = append(manifestChunks, chunk) - // recursive - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks) - if subErr != nil { - return chunks, nil, subErr - } - dataChunks = append(dataChunks, dchunks...) - manifestChunks = append(manifestChunks, mchunks...) - } - return -} - -func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { - if !chunk.IsChunkManifest { - return - } - - // IsChunkManifest - data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) - if err != nil { - return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) - } - m := &filer_pb.FileChunkManifest{} - if err := proto.Unmarshal(data, m); err != nil { - return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) - } - - // recursive - filer_pb.AfterEntryDeserialization(m.Chunks) - return m.Chunks, nil -} - -// TODO fetch from cache for weed mount? -func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { - urlString, err := lookupFileIdFn(fileId) - if err != nil { - glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err - } - var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", fileId, err) - return nil, err - } - - return buffer.Bytes(), nil -} - -func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { - return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest) -} - -func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { - - var dataChunks []*filer_pb.FileChunk - for _, chunk := range inputChunks { - if !chunk.IsChunkManifest { - dataChunks = append(dataChunks, chunk) - } else { - chunks = append(chunks, chunk) - } - } - - remaining := len(dataChunks) - for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor { - chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor]) - if err != nil { - return dataChunks, err - } - chunks = append(chunks, chunk) - remaining -= mergeFactor - } - // remaining - for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { - chunks = append(chunks, dataChunks[i]) - } - return -} - -func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { - - filer_pb.BeforeEntrySerialization(dataChunks) - - // create and serialize the manifest - data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{ - Chunks: dataChunks, - }) - if serErr != nil { - return nil, fmt.Errorf("serializing manifest: %v", serErr) - } - - minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) - for _, chunk := range dataChunks { - if minOffset > int64(chunk.Offset) { - minOffset = chunk.Offset - } - if maxOffset < int64(chunk.Size)+chunk.Offset { - maxOffset = int64(chunk.Size) + chunk.Offset - } - } - - manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0) - if err != nil { - return nil, err - } - manifestChunk.IsChunkManifest = true - manifestChunk.Offset = minOffset - manifestChunk.Size = uint64(maxOffset - minOffset) - - return -} - -type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) diff --git a/weed/filer2/filechunk_manifest_test.go b/weed/filer2/filechunk_manifest_test.go deleted file mode 100644 index 2b0862d07..000000000 --- a/weed/filer2/filechunk_manifest_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package filer2 - -import ( - "bytes" - "math" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func TestDoMaybeManifestize(t *testing.T) { - var manifestTests = []struct { - inputs []*filer_pb.FileChunk - expected []*filer_pb.FileChunk - }{ - { - inputs: []*filer_pb.FileChunk{ - {FileId: "1", IsChunkManifest: false}, - {FileId: "2", IsChunkManifest: false}, - {FileId: "3", IsChunkManifest: false}, - {FileId: "4", IsChunkManifest: false}, - }, - expected: []*filer_pb.FileChunk{ - {FileId: "12", IsChunkManifest: true}, - {FileId: "34", IsChunkManifest: true}, - }, - }, - { - inputs: []*filer_pb.FileChunk{ - {FileId: "1", IsChunkManifest: true}, - {FileId: "2", IsChunkManifest: false}, - {FileId: "3", IsChunkManifest: false}, - {FileId: "4", IsChunkManifest: false}, - }, - expected: []*filer_pb.FileChunk{ - {FileId: "1", IsChunkManifest: true}, - {FileId: "23", IsChunkManifest: true}, - {FileId: "4", IsChunkManifest: false}, - }, - }, - { - inputs: []*filer_pb.FileChunk{ - {FileId: "1", IsChunkManifest: false}, - {FileId: "2", IsChunkManifest: true}, - {FileId: "3", IsChunkManifest: false}, - {FileId: "4", IsChunkManifest: false}, - }, - expected: []*filer_pb.FileChunk{ - {FileId: "2", IsChunkManifest: true}, - {FileId: "13", IsChunkManifest: true}, - {FileId: "4", IsChunkManifest: false}, - }, - }, - { - inputs: []*filer_pb.FileChunk{ - {FileId: "1", IsChunkManifest: true}, - {FileId: "2", IsChunkManifest: true}, - {FileId: "3", IsChunkManifest: false}, - {FileId: "4", IsChunkManifest: false}, - }, - expected: []*filer_pb.FileChunk{ - {FileId: "1", IsChunkManifest: true}, - {FileId: "2", IsChunkManifest: true}, - {FileId: "34", IsChunkManifest: true}, - }, - }, - } - - for i, mtest := range manifestTests { - println("test", i) - actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge) - assertEqualChunks(t, mtest.expected, actual) - } - -} - -func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) { - assert.Equal(t, len(expected), len(actual)) - for i := 0; i < len(actual); i++ { - assertEqualChunk(t, actual[i], expected[i]) - } -} -func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) { - assert.Equal(t, expected.FileId, actual.FileId) - assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest) -} - -func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { - - var buf bytes.Buffer - minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) - for k := 0; k < len(dataChunks); k++ { - chunk := dataChunks[k] - buf.WriteString(chunk.FileId) - if minOffset > int64(chunk.Offset) { - minOffset = chunk.Offset - } - if maxOffset < int64(chunk.Size)+chunk.Offset { - maxOffset = int64(chunk.Size) + chunk.Offset - } - } - - manifestChunk = &filer_pb.FileChunk{ - FileId: buf.String(), - } - manifestChunk.IsChunkManifest = true - manifestChunk.Offset = minOffset - manifestChunk.Size = uint64(maxOffset - minOffset) - - return -} diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go deleted file mode 100644 index 53c679d6b..000000000 --- a/weed/filer2/filechunks.go +++ /dev/null @@ -1,284 +0,0 @@ -package filer2 - -import ( - "fmt" - "hash/fnv" - "math" - "sort" - "sync" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { - for _, c := range chunks { - t := uint64(c.Offset + int64(c.Size)) - if size < t { - size = t - } - } - return -} - -func FileSize(entry *filer_pb.Entry) (size uint64) { - return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) -} - -func ETag(entry *filer_pb.Entry) (etag string) { - if entry.Attributes == nil || entry.Attributes.Md5 == nil { - return ETagChunks(entry.Chunks) - } - return fmt.Sprintf("%x", entry.Attributes.Md5) -} - -func ETagEntry(entry *Entry) (etag string) { - if entry.Attr.Md5 == nil { - return ETagChunks(entry.Chunks) - } - return fmt.Sprintf("%x", entry.Attr.Md5) -} - -func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { - if len(chunks) == 1 { - return chunks[0].ETag - } - - h := fnv.New32a() - for _, c := range chunks { - h.Write([]byte(c.ETag)) - } - return fmt.Sprintf("%x", h.Sum32()) -} - -func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) - - fileIds := make(map[string]bool) - for _, interval := range visibles { - fileIds[interval.fileId] = true - } - for _, chunk := range chunks { - if _, found := fileIds[chunk.GetFileIdString()]; found { - compacted = append(compacted, chunk) - } else { - garbage = append(garbage, chunk) - } - } - - return -} - -func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { - - aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as) - if aErr != nil { - return nil, aErr - } - bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs) - if bErr != nil { - return nil, bErr - } - - delta = append(delta, DoMinusChunks(aData, bData)...) - delta = append(delta, DoMinusChunks(aMeta, bMeta)...) - return -} - -func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { - - fileIds := make(map[string]bool) - for _, interval := range bs { - fileIds[interval.GetFileIdString()] = true - } - for _, chunk := range as { - if _, found := fileIds[chunk.GetFileIdString()]; !found { - delta = append(delta, chunk) - } - } - - return -} - -type ChunkView struct { - FileId string - Offset int64 - Size uint64 - LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk - ChunkSize uint64 - CipherKey []byte - IsGzipped bool -} - -func (cv *ChunkView) IsFullChunk() bool { - return cv.Size == cv.ChunkSize -} - -func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { - - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) - - return ViewFromVisibleIntervals(visibles, offset, size) - -} - -func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) { - - stop := offset + size - if size == math.MaxInt64 { - stop = math.MaxInt64 - } - if stop < offset { - stop = math.MaxInt64 - } - - for _, chunk := range visibles { - - chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop) - - if chunkStart < chunkStop { - views = append(views, &ChunkView{ - FileId: chunk.fileId, - Offset: chunkStart - chunk.start + chunk.chunkOffset, - Size: uint64(chunkStop - chunkStart), - LogicOffset: chunkStart, - ChunkSize: chunk.chunkSize, - CipherKey: chunk.cipherKey, - IsGzipped: chunk.isGzipped, - }) - } - } - - return views - -} - -func logPrintf(name string, visibles []VisibleInterval) { - - /* - glog.V(0).Infof("%s len %d", name, len(visibles)) - for _, v := range visibles { - glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset) - } - */ -} - -var bufPool = sync.Pool{ - New: func() interface{} { - return new(VisibleInterval) - }, -} - -func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) { - - newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed) - - length := len(visibles) - if length == 0 { - return append(visibles, newV) - } - last := visibles[length-1] - if last.stop <= chunk.Offset { - return append(visibles, newV) - } - - logPrintf(" before", visibles) - // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size) - chunkStop := chunk.Offset + int64(chunk.Size) - for _, v := range visibles { - if v.start < chunk.Offset && chunk.Offset < v.stop { - t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped) - newVisibles = append(newVisibles, t) - // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop) - } - if v.start < chunkStop && chunkStop < v.stop { - t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped) - newVisibles = append(newVisibles, t) - // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop) - } - if chunkStop <= v.start || v.stop <= chunk.Offset { - newVisibles = append(newVisibles, v) - // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop) - } - } - newVisibles = append(newVisibles, newV) - - logPrintf(" append", newVisibles) - - for i := len(newVisibles) - 1; i >= 0; i-- { - if i > 0 && newV.start < newVisibles[i-1].start { - newVisibles[i] = newVisibles[i-1] - } else { - newVisibles[i] = newV - break - } - } - logPrintf(" sorted", newVisibles) - - return newVisibles -} - -// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory -// If the file chunk content is a chunk manifest -func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) { - - chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) - - sort.Slice(chunks, func(i, j int) bool { - if chunks[i].Mtime == chunks[j].Mtime { - return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey - } - return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run - }) - - for _, chunk := range chunks { - - // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) - visibles = MergeIntoVisibles(visibles, chunk) - - logPrintf("add", visibles) - - } - - return -} - -// find non-overlapping visible intervals -// visible interval map to one file chunk - -type VisibleInterval struct { - start int64 - stop int64 - modifiedTime int64 - fileId string - chunkOffset int64 - chunkSize uint64 - cipherKey []byte - isGzipped bool -} - -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { - return VisibleInterval{ - start: start, - stop: stop, - fileId: fileId, - modifiedTime: modifiedTime, - chunkOffset: chunkOffset, // the starting position in the chunk - chunkSize: chunkSize, - cipherKey: cipherKey, - isGzipped: isGzipped, - } -} - -func min(x, y int64) int64 { - if x <= y { - return x - } - return y -} -func max(x, y int64) int64 { - if x <= y { - return y - } - return x -} diff --git a/weed/filer2/filechunks2_test.go b/weed/filer2/filechunks2_test.go deleted file mode 100644 index d896da3cc..000000000 --- a/weed/filer2/filechunks2_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package filer2 - -import ( - "sort" - "testing" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func TestCompactFileChunksRealCase(t *testing.T) { - - chunks := []*filer_pb.FileChunk{ - {FileId: "2,512f31f2c0700a", Offset: 0, Size: 25 - 0, Mtime: 5320497}, - {FileId: "6,512f2c2e24e9e8", Offset: 868352, Size: 917585 - 868352, Mtime: 5320492}, - {FileId: "7,514468dd5954ca", Offset: 884736, Size: 901120 - 884736, Mtime: 5325928}, - {FileId: "5,5144463173fe77", Offset: 917504, Size: 2297856 - 917504, Mtime: 5325894}, - {FileId: "4,51444c7ab54e2d", Offset: 2301952, Size: 2367488 - 2301952, Mtime: 5325900}, - {FileId: "4,514450e643ad22", Offset: 2371584, Size: 2420736 - 2371584, Mtime: 5325904}, - {FileId: "6,514456a5e9e4d7", Offset: 2449408, Size: 2490368 - 2449408, Mtime: 5325910}, - {FileId: "3,51444f8d53eebe", Offset: 2494464, Size: 2555904 - 2494464, Mtime: 5325903}, - {FileId: "4,5144578b097c7e", Offset: 2560000, Size: 2596864 - 2560000, Mtime: 5325911}, - {FileId: "3,51445500b6b4ac", Offset: 2637824, Size: 2678784 - 2637824, Mtime: 5325909}, - {FileId: "1,51446285e52a61", Offset: 2695168, Size: 2715648 - 2695168, Mtime: 5325922}, - } - - printChunks("before", chunks) - - compacted, garbage := CompactFileChunks(nil, chunks) - - printChunks("compacted", compacted) - printChunks("garbage", garbage) - -} - -func printChunks(name string, chunks []*filer_pb.FileChunk) { - sort.Slice(chunks, func(i, j int) bool { - if chunks[i].Offset == chunks[j].Offset { - return chunks[i].Mtime < chunks[j].Mtime - } - return chunks[i].Offset < chunks[j].Offset - }) - for _, chunk := range chunks { - glog.V(0).Infof("%s chunk %s [%10d,%10d)", name, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) - } -} diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go deleted file mode 100644 index 31b74a22a..000000000 --- a/weed/filer2/filechunks_test.go +++ /dev/null @@ -1,539 +0,0 @@ -package filer2 - -import ( - "fmt" - "log" - "math" - "math/rand" - "strconv" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func TestCompactFileChunks(t *testing.T) { - chunks := []*filer_pb.FileChunk{ - {Offset: 10, Size: 100, FileId: "abc", Mtime: 50}, - {Offset: 100, Size: 100, FileId: "def", Mtime: 100}, - {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200}, - {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300}, - } - - compacted, garbage := CompactFileChunks(nil, chunks) - - if len(compacted) != 3 { - t.Fatalf("unexpected compacted: %d", len(compacted)) - } - if len(garbage) != 1 { - t.Fatalf("unexpected garbage: %d", len(garbage)) - } - -} - -func TestCompactFileChunks2(t *testing.T) { - - chunks := []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 50}, - {Offset: 100, Size: 100, FileId: "def", Mtime: 100}, - {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200}, - {Offset: 0, Size: 100, FileId: "abcf", Mtime: 300}, - {Offset: 50, Size: 100, FileId: "fhfh", Mtime: 400}, - {Offset: 100, Size: 100, FileId: "yuyu", Mtime: 500}, - } - - k := 3 - - for n := 0; n < k; n++ { - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n), - }) - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k), - }) - } - - compacted, garbage := CompactFileChunks(nil, chunks) - - if len(compacted) != 4 { - t.Fatalf("unexpected compacted: %d", len(compacted)) - } - if len(garbage) != 8 { - t.Fatalf("unexpected garbage: %d", len(garbage)) - } -} - -func TestRandomFileChunksCompact(t *testing.T) { - - data := make([]byte, 1024) - - var chunks []*filer_pb.FileChunk - for i := 0; i < 15; i++ { - start, stop := rand.Intn(len(data)), rand.Intn(len(data)) - if start > stop { - start, stop = stop, start - } - if start+16 < stop { - stop = start + 16 - } - chunk := &filer_pb.FileChunk{ - FileId: strconv.Itoa(i), - Offset: int64(start), - Size: uint64(stop - start), - Mtime: int64(i), - Fid: &filer_pb.FileId{FileKey: uint64(i)}, - } - chunks = append(chunks, chunk) - for x := start; x < stop; x++ { - data[x] = byte(i) - } - } - - visibles, _ := NonOverlappingVisibleIntervals(nil, chunks) - - for _, v := range visibles { - for x := v.start; x < v.stop; x++ { - assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId) - } - } - -} - -func TestIntervalMerging(t *testing.T) { - - testcases := []struct { - Chunks []*filer_pb.FileChunk - Expected []*VisibleInterval - }{ - // case 0: normal - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, - {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 100, fileId: "abc"}, - {start: 100, stop: 200, fileId: "asdf"}, - {start: 200, stop: 300, fileId: "fsad"}, - }, - }, - // case 1: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "asdf"}, - }, - }, - // case 2: updates overwrite part of previous chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, - {Offset: 0, Size: 70, FileId: "b", Mtime: 134}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 70, fileId: "b"}, - {start: 70, stop: 100, fileId: "a", chunkOffset: 70}, - }, - }, - // case 3: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 50, fileId: "asdf"}, - {start: 50, stop: 300, fileId: "xxxx"}, - }, - }, - // case 4: updates far away from prev chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "asdf"}, - {start: 250, stop: 500, fileId: "xxxx"}, - }, - }, - // case 5: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "d", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "c", Mtime: 143}, - {Offset: 80, Size: 100, FileId: "b", Mtime: 134}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "d"}, - {start: 200, stop: 220, fileId: "c", chunkOffset: 130}, - }, - }, - // case 6: same updates - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123}, - {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123}, - {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 100, fileId: "xyz"}, - }, - }, - // case 7: real updates - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 2097152, FileId: "7,0294cbb9892b", Mtime: 123}, - {Offset: 0, Size: 3145728, FileId: "3,029565bf3092", Mtime: 130}, - {Offset: 2097152, Size: 3145728, FileId: "6,029632f47ae2", Mtime: 140}, - {Offset: 5242880, Size: 3145728, FileId: "2,029734c5aa10", Mtime: 150}, - {Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160}, - {Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 2097152, fileId: "3,029565bf3092"}, - {start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"}, - {start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"}, - {start: 8388608, stop: 11534336, fileId: "5,02982f80de50"}, - {start: 11534336, stop: 14376529, fileId: "7,0299ad723803"}, - }, - }, - // case 8: real bug - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 77824, FileId: "4,0b3df938e301", Mtime: 123}, - {Offset: 471040, Size: 472225 - 471040, FileId: "6,0b3e0650019c", Mtime: 130}, - {Offset: 77824, Size: 208896 - 77824, FileId: "4,0b3f0c7202f0", Mtime: 140}, - {Offset: 208896, Size: 339968 - 208896, FileId: "2,0b4031a72689", Mtime: 150}, - {Offset: 339968, Size: 471040 - 339968, FileId: "3,0b416a557362", Mtime: 160}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 77824, fileId: "4,0b3df938e301"}, - {start: 77824, stop: 208896, fileId: "4,0b3f0c7202f0"}, - {start: 208896, stop: 339968, fileId: "2,0b4031a72689"}, - {start: 339968, stop: 471040, fileId: "3,0b416a557362"}, - {start: 471040, stop: 472225, fileId: "6,0b3e0650019c"}, - }, - }, - } - - for i, testcase := range testcases { - log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks) - for x, interval := range intervals { - log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", - i, x, interval.start, interval.stop, interval.fileId) - } - for x, interval := range intervals { - if interval.start != testcase.Expected[x].start { - t.Fatalf("failed on test case %d, interval %d, start %d, expect %d", - i, x, interval.start, testcase.Expected[x].start) - } - if interval.stop != testcase.Expected[x].stop { - t.Fatalf("failed on test case %d, interval %d, stop %d, expect %d", - i, x, interval.stop, testcase.Expected[x].stop) - } - if interval.fileId != testcase.Expected[x].fileId { - t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s", - i, x, interval.fileId, testcase.Expected[x].fileId) - } - if interval.chunkOffset != testcase.Expected[x].chunkOffset { - t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d", - i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset) - } - } - if len(intervals) != len(testcase.Expected) { - t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected)) - } - - } - -} - -func TestChunksReading(t *testing.T) { - - testcases := []struct { - Chunks []*filer_pb.FileChunk - Offset int64 - Size int64 - Expected []*ChunkView - }{ - // case 0: normal - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, - {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, - }, - Offset: 0, - Size: 250, - Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, - {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200}, - }, - }, - // case 1: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - }, - Offset: 50, - Size: 100, - Expected: []*ChunkView{ - {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50}, - }, - }, - // case 2: updates overwrite part of previous chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 3, Size: 100, FileId: "a", Mtime: 123}, - {Offset: 10, Size: 50, FileId: "b", Mtime: 134}, - }, - Offset: 30, - Size: 40, - Expected: []*ChunkView{ - {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30}, - {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60}, - }, - }, - // case 3: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Offset: 0, - Size: 200, - Expected: []*ChunkView{ - {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50}, - }, - }, - // case 4: updates far away from prev chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Offset: 0, - Size: 400, - Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250}, - }, - }, - // case 5: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "c", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "b", Mtime: 143}, - {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, - }, - Offset: 0, - Size: 220, - Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0}, - {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200}, - }, - }, - // case 6: same updates - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123}, - {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123}, - {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123}, - }, - Offset: 0, - Size: 100, - Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0}, - }, - }, - // case 7: edge cases - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, - {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, - }, - Offset: 0, - Size: 200, - Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, - }, - }, - // case 8: edge cases - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 90, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 190, Size: 300, FileId: "fsad", Mtime: 353}, - }, - Offset: 0, - Size: 300, - Expected: []*ChunkView{ - {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90}, - {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190}, - }, - }, - // case 9: edge cases - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 43175947, FileId: "2,111fc2cbfac1", Mtime: 1}, - {Offset: 43175936, Size: 52981771 - 43175936, FileId: "2,112a36ea7f85", Mtime: 2}, - {Offset: 52981760, Size: 72564747 - 52981760, FileId: "4,112d5f31c5e7", Mtime: 3}, - {Offset: 72564736, Size: 133255179 - 72564736, FileId: "1,113245f0cdb6", Mtime: 4}, - {Offset: 133255168, Size: 137269259 - 133255168, FileId: "3,1141a70733b5", Mtime: 5}, - {Offset: 137269248, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", Mtime: 6}, - }, - Offset: 0, - Size: 153578836, - Expected: []*ChunkView{ - {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0}, - {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936}, - {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760}, - {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736}, - {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168}, - {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248}, - }, - }, - } - - for i, testcase := range testcases { - if i != 2 { - // continue - } - log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) - chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) - for x, chunk := range chunks { - log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", - i, x, chunk.Offset, chunk.Size, chunk.FileId) - if chunk.Offset != testcase.Expected[x].Offset { - t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d", - i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset) - } - if chunk.Size != testcase.Expected[x].Size { - t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d", - i, chunk.FileId, chunk.Size, testcase.Expected[x].Size) - } - if chunk.FileId != testcase.Expected[x].FileId { - t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s", - i, x, chunk.FileId, testcase.Expected[x].FileId) - } - if chunk.LogicOffset != testcase.Expected[x].LogicOffset { - t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d", - i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset) - } - } - if len(chunks) != len(testcase.Expected) { - t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected)) - } - } - -} - -func BenchmarkCompactFileChunks(b *testing.B) { - - var chunks []*filer_pb.FileChunk - - k := 1024 - - for n := 0; n < k; n++ { - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n), - }) - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k), - }) - } - - for n := 0; n < b.N; n++ { - CompactFileChunks(nil, chunks) - } -} - -func TestViewFromVisibleIntervals(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 0, - stop: 25, - fileId: "fid1", - }, - { - start: 4096, - stop: 8192, - fileId: "fid2", - }, - { - start: 16384, - stop: 18551, - fileId: "fid3", - }, - } - - views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) - - if len(views) != len(visibles) { - assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") - } - -} - -func TestViewFromVisibleIntervals2(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 344064, - stop: 348160, - fileId: "fid1", - }, - { - start: 348160, - stop: 356352, - fileId: "fid2", - }, - } - - views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) - - if len(views) != len(visibles) { - assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") - } - -} - -func TestViewFromVisibleIntervals3(t *testing.T) { - visibles := []VisibleInterval{ - { - start: 1000, - stop: 2000, - fileId: "fid1", - }, - { - start: 3000, - stop: 4000, - fileId: "fid2", - }, - } - - views := ViewFromVisibleIntervals(visibles, 1700, 1500) - - if len(views) != len(visibles) { - assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") - } - -} diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go deleted file mode 100644 index 78e27ed08..000000000 --- a/weed/filer2/filer.go +++ /dev/null @@ -1,290 +0,0 @@ -package filer2 - -import ( - "context" - "errors" - "fmt" - "os" - "strings" - "time" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -const ( - LogFlushInterval = time.Minute - PaginationSize = 1024 * 256 -) - -var ( - OS_UID = uint32(os.Getuid()) - OS_GID = uint32(os.Getgid()) - ErrUnsupportedListDirectoryPrefixed = errors.New("UNSUPPORTED") -) - -type Filer struct { - Store *FilerStoreWrapper - MasterClient *wdclient.MasterClient - fileIdDeletionQueue *util.UnboundedQueue - GrpcDialOption grpc.DialOption - DirBucketsPath string - FsyncBuckets []string - buckets *FilerBuckets - Cipher bool - LocalMetaLogBuffer *log_buffer.LogBuffer - metaLogCollection string - metaLogReplication string - MetaAggregator *MetaAggregator - Signature int32 -} - -func NewFiler(masters []string, grpcDialOption grpc.DialOption, - filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { - f := &Filer{ - MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), - fileIdDeletionQueue: util.NewUnboundedQueue(), - GrpcDialOption: grpcDialOption, - Signature: util.RandomInt32(), - } - f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) - f.metaLogCollection = collection - f.metaLogReplication = replication - - go f.loopProcessingDeletion() - - return f -} - -func (f *Filer) AggregateFromPeers(self string, filers []string) { - - // set peers - if len(filers) == 0 { - filers = append(filers, self) - } - f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) - f.MetaAggregator.StartLoopSubscribe(f, self) - -} - -func (f *Filer) SetStore(store FilerStore) { - f.Store = NewFilerStoreWrapper(store) -} - -func (f *Filer) GetStore() (store FilerStore) { - return f.Store -} - -func (fs *Filer) GetMaster() string { - return fs.MasterClient.GetMaster() -} - -func (fs *Filer) KeepConnectedToMaster() { - fs.MasterClient.KeepConnectedToMaster() -} - -func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) { - return f.Store.BeginTransaction(ctx) -} - -func (f *Filer) CommitTransaction(ctx context.Context) error { - return f.Store.CommitTransaction(ctx) -} - -func (f *Filer) RollbackTransaction(ctx context.Context) error { - return f.Store.RollbackTransaction(ctx) -} - -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error { - - if string(entry.FullPath) == "/" { - return nil - } - - dirParts := strings.Split(string(entry.FullPath), "/") - - // fmt.Printf("directory parts: %+v\n", dirParts) - - var lastDirectoryEntry *Entry - - for i := 1; i < len(dirParts); i++ { - dirPath := "/" + util.Join(dirParts[:i]...) - // fmt.Printf("%d directory: %+v\n", i, dirPath) - - // check the store directly - glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) - - // no such existing directory - if dirEntry == nil { - - // create the directory - now := time.Now() - - dirEntry = &Entry{ - FullPath: util.FullPath(dirPath), - Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | entry.Mode | 0110, - Uid: entry.Uid, - Gid: entry.Gid, - Collection: entry.Collection, - Replication: entry.Replication, - UserName: entry.UserName, - GroupNames: entry.GroupNames, - }, - } - - glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) - mkdirErr := f.Store.InsertEntry(ctx, dirEntry) - if mkdirErr != nil { - if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { - glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) - return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) - } - } else { - f.maybeAddBucket(dirEntry) - f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) - } - - } else if !dirEntry.IsDirectory() { - glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) - return fmt.Errorf("%s is a file", dirPath) - } - - // remember the direct parent directory entry - if i == len(dirParts)-1 { - lastDirectoryEntry = dirEntry - } - - } - - if lastDirectoryEntry == nil { - glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath) - return fmt.Errorf("parent folder not found: %v", entry.FullPath) - } - - /* - if !hasWritePermission(lastDirectoryEntry, entry) { - glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d", - lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid) - return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath) - } - */ - - oldEntry, _ := f.FindEntry(ctx, entry.FullPath) - - glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl) - if oldEntry == nil { - if err := f.Store.InsertEntry(ctx, entry); err != nil { - glog.Errorf("insert entry %s: %v", entry.FullPath, err) - return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) - } - } else { - if o_excl { - glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath) - return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath) - } - if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil { - glog.Errorf("update entry %s: %v", entry.FullPath, err) - return fmt.Errorf("update entry %s: %v", entry.FullPath, err) - } - } - - f.maybeAddBucket(entry) - f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) - - f.deleteChunksIfNotNew(oldEntry, entry) - - glog.V(4).Infof("CreateEntry %s: created", entry.FullPath) - - return nil -} - -func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { - if oldEntry != nil { - if oldEntry.IsDirectory() && !entry.IsDirectory() { - glog.Errorf("existing %s is a directory", entry.FullPath) - return fmt.Errorf("existing %s is a directory", entry.FullPath) - } - if !oldEntry.IsDirectory() && entry.IsDirectory() { - glog.Errorf("existing %s is a file", entry.FullPath) - return fmt.Errorf("existing %s is a file", entry.FullPath) - } - } - return f.Store.UpdateEntry(ctx, entry) -} - -func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) { - - now := time.Now() - - if string(p) == "/" { - return &Entry{ - FullPath: p, - Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | 0755, - Uid: OS_UID, - Gid: OS_GID, - }, - }, nil - } - entry, err = f.Store.FindEntry(ctx, p) - if entry != nil && entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - f.Store.DeleteEntry(ctx, p.Child(entry.Name())) - return nil, filer_pb.ErrNotFound - } - } - return - -} - -func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) { - if strings.HasSuffix(string(p), "/") && len(p) > 1 { - p = p[0 : len(p)-1] - } - - var makeupEntries []*Entry - entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix) - for expiredCount > 0 && err == nil { - makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix) - if err == nil { - entries = append(entries, makeupEntries...) - } - } - - return entries, err -} - -func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) { - listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix) - if listErr != nil { - return listedEntries, expiredCount, "", listErr - } - for _, entry := range listedEntries { - lastFileName = entry.Name() - if entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - f.Store.DeleteEntry(ctx, p.Child(entry.Name())) - expiredCount++ - continue - } - } - entries = append(entries, entry) - } - return -} - -func (f *Filer) Shutdown() { - f.LocalMetaLogBuffer.Shutdown() - f.Store.Shutdown() -} diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go deleted file mode 100644 index 6b7c2c31a..000000000 --- a/weed/filer2/filer_buckets.go +++ /dev/null @@ -1,121 +0,0 @@ -package filer2 - -import ( - "context" - "math" - "sync" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type BucketName string -type BucketOption struct { - Name BucketName - Replication string - fsync bool -} -type FilerBuckets struct { - dirBucketsPath string - buckets map[BucketName]*BucketOption - sync.RWMutex -} - -func (f *Filer) LoadBuckets() { - - f.buckets = &FilerBuckets{ - buckets: make(map[BucketName]*BucketOption), - } - - limit := math.MaxInt32 - - entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "") - - if err != nil { - glog.V(1).Infof("no buckets found: %v", err) - return - } - - shouldFsyncMap := make(map[string]bool) - for _, bucket := range f.FsyncBuckets { - shouldFsyncMap[bucket] = true - } - - glog.V(1).Infof("buckets found: %d", len(entries)) - - f.buckets.Lock() - for _, entry := range entries { - _, shouldFsnyc := shouldFsyncMap[entry.Name()] - f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ - Name: BucketName(entry.Name()), - Replication: entry.Replication, - fsync: shouldFsnyc, - } - } - f.buckets.Unlock() - -} - -func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) { - - f.buckets.RLock() - defer f.buckets.RUnlock() - - option, found := f.buckets.buckets[BucketName(buketName)] - - if !found { - return "", false - } - return option.Replication, option.fsync - -} - -func (f *Filer) isBucket(entry *Entry) bool { - if !entry.IsDirectory() { - return false - } - parent, dirName := entry.FullPath.DirAndName() - if parent != f.DirBucketsPath { - return false - } - - f.buckets.RLock() - defer f.buckets.RUnlock() - - _, found := f.buckets.buckets[BucketName(dirName)] - - return found - -} - -func (f *Filer) maybeAddBucket(entry *Entry) { - if !entry.IsDirectory() { - return - } - parent, dirName := entry.FullPath.DirAndName() - if parent != f.DirBucketsPath { - return - } - f.addBucket(dirName, &BucketOption{ - Name: BucketName(dirName), - Replication: entry.Replication, - }) -} - -func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) { - - f.buckets.Lock() - defer f.buckets.Unlock() - - f.buckets.buckets[BucketName(buketName)] = bucketOption - -} - -func (f *Filer) deleteBucket(buketName string) { - - f.buckets.Lock() - defer f.buckets.Unlock() - - delete(f.buckets.buckets, BucketName(buketName)) - -} diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go deleted file mode 100644 index 4d2a1ef43..000000000 --- a/weed/filer2/filer_delete_entry.go +++ /dev/null @@ -1,129 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { - if p == "/" { - return nil - } - - entry, findErr := f.FindEntry(ctx, p) - if findErr != nil { - return findErr - } - - isCollection := f.isBucket(entry) - - var chunks []*filer_pb.FileChunk - chunks = append(chunks, entry.Chunks...) - if entry.IsDirectory() { - // delete the folder children, not including the folder itself - var dirChunks []*filer_pb.FileChunk - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster) - if err != nil { - glog.V(0).Infof("delete directory %s: %v", p, err) - return fmt.Errorf("delete directory %s: %v", p, err) - } - chunks = append(chunks, dirChunks...) - } - - // delete the file or folder - err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures) - if err != nil { - return fmt.Errorf("delete file %s: %v", p, err) - } - - if shouldDeleteChunks && !isCollection { - go f.DeleteChunks(chunks) - } - if isCollection { - collectionName := entry.Name() - f.doDeleteCollection(collectionName) - f.deleteBucket(collectionName) - } - - return nil -} - -func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) { - - lastFileName := "" - includeLastFile := false - for { - entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "") - if err != nil { - glog.Errorf("list folder %s: %v", entry.FullPath, err) - return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) - } - if lastFileName == "" && !isRecursive && len(entries) > 0 { - // only for first iteration in the loop - glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) - return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) - } - - for _, sub := range entries { - lastFileName = sub.Name() - var dirChunks []*filer_pb.FileChunk - if sub.IsDirectory() { - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false) - chunks = append(chunks, dirChunks...) - } else { - f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) - chunks = append(chunks, sub.Chunks...) - } - if err != nil && !ignoreRecursiveError { - return nil, err - } - } - - if len(entries) < PaginationSize { - break - } - } - - glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) - - if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { - return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) - } - - f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil) - - return chunks, nil -} - -func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) { - - glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) - - if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { - return fmt.Errorf("filer store delete: %v", storeDeletionErr) - } - if !entry.IsDirectory() { - f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) - } - - return nil -} - -func (f *Filer) doDeleteCollection(collectionName string) (err error) { - - return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ - Name: collectionName, - }) - if err != nil { - glog.Infof("delete collection %s: %v", collectionName, err) - } - return err - }) - -} diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go deleted file mode 100644 index dbee4a61d..000000000 --- a/weed/filer2/filer_deletion.go +++ /dev/null @@ -1,109 +0,0 @@ -package filer2 - -import ( - "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error) { - return func(vids []string) (map[string]operation.LookupResult, error) { - m := make(map[string]operation.LookupResult) - for _, vid := range vids { - locs, _ := masterClient.GetVidLocations(vid) - var locations []operation.Location - for _, loc := range locs { - locations = append(locations, operation.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - }) - } - m[vid] = operation.LookupResult{ - VolumeId: vid, - Locations: locations, - } - } - return m, nil - } -} - -func (f *Filer) loopProcessingDeletion() { - - lookupFunc := LookupByMasterClientFn(f.MasterClient) - - DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. - - var deletionCount int - for { - deletionCount = 0 - f.fileIdDeletionQueue.Consume(func(fileIds []string) { - for len(fileIds) > 0 { - var toDeleteFileIds []string - if len(fileIds) > DeletionBatchSize { - toDeleteFileIds = fileIds[:DeletionBatchSize] - fileIds = fileIds[DeletionBatchSize:] - } else { - toDeleteFileIds = fileIds - fileIds = fileIds[:0] - } - deletionCount = len(toDeleteFileIds) - _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) - if err != nil { - if !strings.Contains(err.Error(), "already deleted") { - glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) - } - } else { - glog.V(1).Infof("deleting fileIds len=%d", deletionCount) - } - } - }) - - if deletionCount == 0 { - time.Sleep(1123 * time.Millisecond) - } - } -} - -func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { - for _, chunk := range chunks { - if !chunk.IsChunkManifest { - f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) - continue - } - dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) - if manifestResolveErr != nil { - glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) - } - for _, dChunk := range dataChunks { - f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString()) - } - f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) - } -} - -func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { - - if oldEntry == nil { - return - } - if newEntry == nil { - f.DeleteChunks(oldEntry.Chunks) - } - - var toDelete []*filer_pb.FileChunk - newChunkIds := make(map[string]bool) - for _, newChunk := range newEntry.Chunks { - newChunkIds[newChunk.GetFileIdString()] = true - } - - for _, oldChunk := range oldEntry.Chunks { - if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { - toDelete = append(toDelete, oldChunk) - } - } - f.DeleteChunks(toDelete) -} diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go deleted file mode 100644 index a95fcf0b4..000000000 --- a/weed/filer2/filer_notify.go +++ /dev/null @@ -1,169 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "io" - "strings" - "time" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/notification" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) { - var fullpath string - if oldEntry != nil { - fullpath = string(oldEntry.FullPath) - } else if newEntry != nil { - fullpath = string(newEntry.FullPath) - } else { - return - } - - // println("fullpath:", fullpath) - - if strings.HasPrefix(fullpath, SystemLogDir) { - return - } - - newParentPath := "" - if newEntry != nil { - newParentPath, _ = newEntry.FullPath.DirAndName() - } - eventNotification := &filer_pb.EventNotification{ - OldEntry: oldEntry.ToProtoEntry(), - NewEntry: newEntry.ToProtoEntry(), - DeleteChunks: deleteChunks, - NewParentPath: newParentPath, - IsFromOtherCluster: isFromOtherCluster, - Signatures: append(signatures, f.Signature), - } - - if notification.Queue != nil { - glog.V(3).Infof("notifying entry update %v", fullpath) - notification.Queue.SendMessage(fullpath, eventNotification) - } - - f.logMetaEvent(ctx, fullpath, eventNotification) - -} - -func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotification *filer_pb.EventNotification) { - - dir, _ := util.FullPath(fullpath).DirAndName() - - event := &filer_pb.SubscribeMetadataResponse{ - Directory: dir, - EventNotification: eventNotification, - TsNs: time.Now().UnixNano(), - } - data, err := proto.Marshal(event) - if err != nil { - glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err) - return - } - - f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) - -} - -func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { - - startTime, stopTime = startTime.UTC(), stopTime.UTC() - - targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - // startTime.Second(), startTime.Nanosecond(), - ) - - for { - if err := f.appendToFile(targetFile, buf); err != nil { - glog.V(1).Infof("log write failed %s: %v", targetFile, err) - time.Sleep(737 * time.Millisecond) - } else { - break - } - } -} - -func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { - - startTime = startTime.UTC() - startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) - - sizeBuf := make([]byte, 4) - startTsNs := startTime.UnixNano() - - dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "") - if listDayErr != nil { - return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr) - } - for _, dayEntry := range dayEntries { - // println("checking day", dayEntry.FullPath) - hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "") - if listHourMinuteErr != nil { - return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) - } - for _, hourMinuteEntry := range hourMinuteEntries { - // println("checking hh-mm", hourMinuteEntry.FullPath) - if dayEntry.Name() == startDate { - if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 { - continue - } - } - // println("processing", hourMinuteEntry.FullPath) - chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks) - if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { - chunkedFileReader.Close() - if err == io.EOF { - continue - } - return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) - } - chunkedFileReader.Close() - } - } - - return lastTsNs, nil -} - -func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { - for { - n, err := r.Read(sizeBuf) - if err != nil { - return lastTsNs, err - } - if n != 4 { - return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n) - } - size := util.BytesToUint32(sizeBuf) - // println("entry size", size) - entryData := make([]byte, size) - n, err = r.Read(entryData) - if err != nil { - return lastTsNs, err - } - if n != int(size) { - return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size) - } - logEntry := &filer_pb.LogEntry{} - if err = proto.Unmarshal(entryData, logEntry); err != nil { - return lastTsNs, err - } - if logEntry.TsNs <= ns { - return lastTsNs, nil - } - // println("each log: ", logEntry.TsNs) - if err := eachLogEntryFn(logEntry); err != nil { - return lastTsNs, err - } else { - lastTsNs = logEntry.TsNs - } - } -} diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go deleted file mode 100644 index 31cdb3c1c..000000000 --- a/weed/filer2/filer_notify_append.go +++ /dev/null @@ -1,73 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (f *Filer) appendToFile(targetFile string, data []byte) error { - - assignResult, uploadResult, err2 := f.assignAndUpload(data) - if err2 != nil { - return err2 - } - - // find out existing entry - fullpath := util.FullPath(targetFile) - entry, err := f.FindEntry(context.Background(), fullpath) - var offset int64 = 0 - if err == filer_pb.ErrNotFound { - entry = &Entry{ - FullPath: fullpath, - Attr: Attr{ - Crtime: time.Now(), - Mtime: time.Now(), - Mode: os.FileMode(0644), - Uid: OS_UID, - Gid: OS_GID, - }, - } - } else { - offset = int64(TotalSize(entry.Chunks)) - } - - // append to existing chunks - entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) - - // update the entry - err = f.CreateEntry(context.Background(), entry, false, false, nil) - - return err -} - -func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) { - // assign a volume location - assignRequest := &operation.VolumeAssignRequest{ - Count: 1, - Collection: f.metaLogCollection, - Replication: f.metaLogReplication, - WritableVolumeCount: 1, - } - assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) - if err != nil { - return nil, nil, fmt.Errorf("AssignVolume: %v", err) - } - if assignResult.Error != "" { - return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error) - } - - // upload data - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth) - if err != nil { - return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) - } - // println("uploaded to", targetUrl) - return assignResult, uploadResult, nil -} diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go deleted file mode 100644 index 29170bfdf..000000000 --- a/weed/filer2/filer_notify_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package filer2 - -import ( - "testing" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - - "github.com/golang/protobuf/proto" -) - -func TestProtoMarshalText(t *testing.T) { - - oldEntry := &Entry{ - FullPath: util.FullPath("/this/path/to"), - Attr: Attr{ - Mtime: time.Now(), - Mode: 0644, - Uid: 1, - Mime: "text/json", - TtlSec: 25, - }, - Chunks: []*filer_pb.FileChunk{ - &filer_pb.FileChunk{ - FileId: "234,2423423422", - Offset: 234234, - Size: 234, - Mtime: 12312423, - ETag: "2342342354", - SourceFileId: "23234,2342342342", - }, - }, - } - - notification := &filer_pb.EventNotification{ - OldEntry: oldEntry.ToProtoEntry(), - NewEntry: nil, - DeleteChunks: true, - } - - text := proto.MarshalTextString(notification) - - notification2 := &filer_pb.EventNotification{} - proto.UnmarshalText(text, notification2) - - if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId { - t.Fatalf("marshal/unmarshal error: %s", text) - } - - println(text) - -} diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go deleted file mode 100644 index dd5abe421..000000000 --- a/weed/filer2/filerstore.go +++ /dev/null @@ -1,208 +0,0 @@ -package filer2 - -import ( - "context" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type FilerStore interface { - // GetName gets the name to locate the configuration in filer.toml file - GetName() string - // Initialize initializes the file store - Initialize(configuration util.Configuration, prefix string) error - InsertEntry(context.Context, *Entry) error - UpdateEntry(context.Context, *Entry) (err error) - // err == filer2.ErrNotFound if not found - FindEntry(context.Context, util.FullPath) (entry *Entry, err error) - DeleteEntry(context.Context, util.FullPath) (err error) - DeleteFolderChildren(context.Context, util.FullPath) (err error) - ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) - ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) - - BeginTransaction(ctx context.Context) (context.Context, error) - CommitTransaction(ctx context.Context) error - RollbackTransaction(ctx context.Context) error - - Shutdown() -} - -type FilerLocalStore interface { - UpdateOffset(filer string, lastTsNs int64) error - ReadOffset(filer string) (lastTsNs int64, err error) -} - -type FilerStoreWrapper struct { - ActualStore FilerStore -} - -func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { - if innerStore, ok := store.(*FilerStoreWrapper); ok { - return innerStore - } - return &FilerStoreWrapper{ - ActualStore: store, - } -} - -func (fsw *FilerStoreWrapper) GetName() string { - return fsw.ActualStore.GetName() -} - -func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { - return fsw.ActualStore.Initialize(configuration, prefix) -} - -func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - if entry.Mime == "application/octet-stream" { - entry.Mime = "" - } - return fsw.ActualStore.InsertEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - if entry.Mime == "application/octet-stream" { - entry.Mime = "" - } - return fsw.ActualStore.UpdateEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds()) - }() - - entry, err = fsw.ActualStore.FindEntry(ctx, fp) - if err != nil { - return nil, err - } - filer_pb.AfterEntryDeserialization(entry.Chunks) - return -} - -func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) - }() - - return fsw.ActualStore.DeleteEntry(ctx, fp) -} - -func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) - }() - - return fsw.ActualStore.DeleteFolderChildren(ctx, fp) -} - -func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds()) - }() - - entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - for _, entry := range entries { - filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, err -} - -func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { - stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) - }() - entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) - if err == ErrUnsupportedListDirectoryPrefixed { - entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) - } - if err != nil { - return nil, err - } - for _, entry := range entries { - filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, nil -} - -func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) { - entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - - if prefix == "" { - return - } - - count := 0 - var lastFileName string - notPrefixed := entries - entries = nil - for count < limit && len(notPrefixed) > 0 { - for _, entry := range notPrefixed { - lastFileName = entry.Name() - if strings.HasPrefix(entry.Name(), prefix) { - count++ - entries = append(entries, entry) - if count >= limit { - break - } - } - } - if count < limit { - notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit) - if err != nil { - return - } - } - } - return -} - -func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { - return fsw.ActualStore.BeginTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { - return fsw.ActualStore.CommitTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { - return fsw.ActualStore.RollbackTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) Shutdown() { - fsw.ActualStore.Shutdown() -} diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go deleted file mode 100644 index 1c08d2831..000000000 --- a/weed/filer2/leveldb/leveldb_store.go +++ /dev/null @@ -1,231 +0,0 @@ -package leveldb - -import ( - "bytes" - "context" - "fmt" - "github.com/syndtr/goleveldb/leveldb" - leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/opt" - leveldb_util "github.com/syndtr/goleveldb/leveldb/util" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - weed_util "github.com/chrislusf/seaweedfs/weed/util" -) - -const ( - DIR_FILE_SEPARATOR = byte(0x00) -) - -func init() { - filer2.Stores = append(filer2.Stores, &LevelDBStore{}) -} - -type LevelDBStore struct { - db *leveldb.DB -} - -func (store *LevelDBStore) GetName() string { - return "leveldb" -} - -func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - dir := configuration.GetString(prefix + "dir") - return store.initialize(dir) -} - -func (store *LevelDBStore) initialize(dir string) (err error) { - glog.Infof("filer store dir: %s", dir) - if err := weed_util.TestFolderWritable(dir); err != nil { - return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) - } - - opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 10, - } - - if store.db, err = leveldb.OpenFile(dir, opts); err != nil { - if leveldb_errors.IsCorrupted(err) { - store.db, err = leveldb.RecoverFile(dir, opts) - } - if err != nil { - glog.Infof("filer store open dir %s: %v", dir, err) - return - } - } - return -} - -func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *LevelDBStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - key := genKey(entry.DirAndName()) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - err = store.db.Put(key, value, nil) - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) - - return nil -} - -func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { - key := genKey(fullpath.DirAndName()) - - data, err := store.db.Get(key, nil) - - if err == leveldb.ErrNotFound { - return nil, filer_pb.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) - - return entry, nil -} - -func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { - key := genKey(fullpath.DirAndName()) - - err = store.db.Delete(key, nil) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { - - batch := new(leveldb.Batch) - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - batch.Delete([]byte(genKey(string(fullpath), fileName))) - } - iter.Release() - - err = store.db.Write(batch, nil) - - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - return nil, filer2.ErrUnsupportedListDirectoryPrefixed -} - -func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - if fileName == startFileName && !inclusive { - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: weed_util.NewFullPath(string(fullpath), fileName), - } - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - iter.Release() - - return entries, err -} - -func genKey(dirPath, fileName string) (key []byte) { - key = []byte(dirPath) - key = append(key, DIR_FILE_SEPARATOR) - key = append(key, []byte(fileName)...) - return key -} - -func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { - keyPrefix = []byte(string(fullpath)) - keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix -} - -func getNameFromKey(key []byte) string { - - sepIndex := len(key) - 1 - for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR { - sepIndex-- - } - - return string(key[sepIndex+1:]) - -} - -func (store *LevelDBStore) Shutdown() { - store.db.Close() -} diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go deleted file mode 100644 index 86e8aa952..000000000 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package leveldb - -import ( - "context" - "io/ioutil" - "os" - "testing" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) - store := &LevelDBStore{} - store.initialize(dir) - filer.SetStore(store) - - fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") - - ctx := context.Background() - - entry1 := &filer2.Entry{ - FullPath: fullpath, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil { - t.Errorf("create entry %v: %v", entry1.FullPath, err) - return - } - - entry, err := filer.FindEntry(ctx, fullpath) - - if err != nil { - t.Errorf("find entry: %v", err) - return - } - - if entry.FullPath != entry1.FullPath { - t.Errorf("find wrong entry: %v", entry.FullPath) - return - } - - // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "") - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} - -func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) - store := &LevelDBStore{} - store.initialize(dir) - filer.SetStore(store) - - ctx := context.Background() - - // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") - if err != nil { - t.Errorf("list entries: %v", err) - return - } - if len(entries) != 0 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} diff --git a/weed/filer2/leveldb2/leveldb2_local_store.go b/weed/filer2/leveldb2/leveldb2_local_store.go deleted file mode 100644 index 3625abf9e..000000000 --- a/weed/filer2/leveldb2/leveldb2_local_store.go +++ /dev/null @@ -1,43 +0,0 @@ -package leveldb - -import ( - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - _ = filer2.FilerLocalStore(&LevelDB2Store{}) -) - -func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error { - - value := make([]byte, 8) - util.Uint64toBytes(value, uint64(lastTsNs)) - - err := store.dbs[0].Put([]byte("meta"+filer), value, nil) - - if err != nil { - return fmt.Errorf("UpdateOffset %s : %v", filer, err) - } - - println("UpdateOffset", filer, "lastTsNs", lastTsNs) - - return nil -} - -func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) { - - value, err := store.dbs[0].Get([]byte("meta"+filer), nil) - - if err != nil { - return 0, fmt.Errorf("ReadOffset %s : %v", filer, err) - } - - lastTsNs = int64(util.BytesToUint64(value)) - - println("ReadOffset", filer, "lastTsNs", lastTsNs) - - return -} diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go deleted file mode 100644 index ca9d6f04d..000000000 --- a/weed/filer2/leveldb2/leveldb2_store.go +++ /dev/null @@ -1,251 +0,0 @@ -package leveldb - -import ( - "bytes" - "context" - "crypto/md5" - "fmt" - "github.com/syndtr/goleveldb/leveldb" - leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/opt" - leveldb_util "github.com/syndtr/goleveldb/leveldb/util" - "io" - "os" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - weed_util "github.com/chrislusf/seaweedfs/weed/util" -) - -func init() { - filer2.Stores = append(filer2.Stores, &LevelDB2Store{}) -} - -type LevelDB2Store struct { - dbs []*leveldb.DB - dbCount int -} - -func (store *LevelDB2Store) GetName() string { - return "leveldb2" -} - -func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - dir := configuration.GetString(prefix + "dir") - return store.initialize(dir, 8) -} - -func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { - glog.Infof("filer store leveldb2 dir: %s", dir) - if err := weed_util.TestFolderWritable(dir); err != nil { - return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) - } - - opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - } - - for d := 0; d < dbCount; d++ { - dbFolder := fmt.Sprintf("%s/%02d", dir, d) - os.MkdirAll(dbFolder, 0755) - db, dbErr := leveldb.OpenFile(dbFolder, opts) - if leveldb_errors.IsCorrupted(dbErr) { - db, dbErr = leveldb.RecoverFile(dbFolder, opts) - } - if dbErr != nil { - glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr) - return dbErr - } - store.dbs = append(store.dbs, db) - } - store.dbCount = dbCount - - return -} - -func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - dir, name := entry.DirAndName() - key, partitionId := genKey(dir, name, store.dbCount) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - err = store.dbs[partitionId].Put(key, value, nil) - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) - - return nil -} - -func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { - dir, name := fullpath.DirAndName() - key, partitionId := genKey(dir, name, store.dbCount) - - data, err := store.dbs[partitionId].Get(key, nil) - - if err == leveldb.ErrNotFound { - return nil, filer_pb.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) - - return entry, nil -} - -func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { - dir, name := fullpath.DirAndName() - key, partitionId := genKey(dir, name, store.dbCount) - - err = store.dbs[partitionId].Delete(key, nil) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { - directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) - - batch := new(leveldb.Batch) - - iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - batch.Delete(append(directoryPrefix, []byte(fileName)...)) - } - iter.Release() - - err = store.dbs[partitionId].Write(batch, nil) - - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - return nil, filer2.ErrUnsupportedListDirectoryPrefixed -} - -func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) - lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount) - - iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - if fileName == startFileName && !inclusive { - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: weed_util.NewFullPath(string(fullpath), fileName), - } - - // println("list", entry.FullPath, "chunks", len(entry.Chunks)) - - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - iter.Release() - - return entries, err -} - -func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) { - key, partitionId = hashToBytes(dirPath, dbCount) - key = append(key, []byte(fileName)...) - return key, partitionId -} - -func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { - keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix, partitionId -} - -func getNameFromKey(key []byte) string { - - return string(key[md5.Size:]) - -} - -// hash directory, and use last byte for partitioning -func hashToBytes(dir string, dbCount int) ([]byte, int) { - h := md5.New() - io.WriteString(h, dir) - - b := h.Sum(nil) - - x := b[len(b)-1] - - return b, int(x) % dbCount -} - -func (store *LevelDB2Store) Shutdown() { - for d := 0; d < store.dbCount; d++ { - store.dbs[d].Close() - } -} diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go deleted file mode 100644 index 62d2475fe..000000000 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package leveldb - -import ( - "context" - "io/ioutil" - "os" - "testing" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) - store := &LevelDB2Store{} - store.initialize(dir, 2) - filer.SetStore(store) - - fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") - - ctx := context.Background() - - entry1 := &filer2.Entry{ - FullPath: fullpath, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil { - t.Errorf("create entry %v: %v", entry1.FullPath, err) - return - } - - entry, err := filer.FindEntry(ctx, fullpath) - - if err != nil { - t.Errorf("find entry: %v", err) - return - } - - if entry.FullPath != entry1.FullPath { - t.Errorf("find wrong entry: %v", entry.FullPath) - return - } - - // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "") - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} - -func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) - store := &LevelDB2Store{} - store.initialize(dir, 2) - filer.SetStore(store) - - ctx := context.Background() - - // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "") - if err != nil { - t.Errorf("list entries: %v", err) - return - } - if len(entries) != 0 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go deleted file mode 100644 index f2792bd26..000000000 --- a/weed/filer2/meta_aggregator.go +++ /dev/null @@ -1,131 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "io" - "sync" - "time" - - "github.com/golang/protobuf/proto" - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" -) - -type MetaAggregator struct { - filers []string - grpcDialOption grpc.DialOption - MetaLogBuffer *log_buffer.LogBuffer - // notifying clients - ListenersLock sync.Mutex - ListenersCond *sync.Cond -} - -// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. -// The old data comes from what each LocalMetadata persisted on disk. -func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator { - t := &MetaAggregator{ - filers: filers, - grpcDialOption: grpcDialOption, - } - t.ListenersCond = sync.NewCond(&t.ListenersLock) - t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() { - t.ListenersCond.Broadcast() - }) - return t -} - -func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { - for _, filer := range ma.filers { - go ma.subscribeToOneFiler(f, self, filer) - } -} - -func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) { - - var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) - lastPersistTime := time.Now() - changesSinceLastPersist := 0 - lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() - - MaxChangeLimit := 100 - - if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok { - if self != filer { - - if prevTsNs, err := localStore.ReadOffset(filer); err == nil { - lastTsNs = prevTsNs - } - - glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs) - maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { - if err := Replay(f.Store.ActualStore, event); err != nil { - glog.Errorf("failed to reply metadata change from %v: %v", filer, err) - return - } - changesSinceLastPersist++ - if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) { - if err := localStore.UpdateOffset(filer, event.TsNs); err == nil { - lastPersistTime = time.Now() - changesSinceLastPersist = 0 - } else { - glog.V(0).Infof("failed to update offset for %v: %v", filer, err) - } - } - } - } else { - glog.V(0).Infof("skipping following self: %v", self) - } - } - - processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error { - data, err := proto.Marshal(event) - if err != nil { - glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err) - return err - } - dir := event.Directory - // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) - if maybeReplicateMetadataChange != nil { - maybeReplicateMetadataChange(event) - } - return nil - } - - for { - err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer:" + self, - PathPrefix: "/", - SinceNs: lastTsNs, - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return fmt.Errorf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs - } - }) - if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err) - time.Sleep(1733 * time.Millisecond) - } - } -} diff --git a/weed/filer2/meta_replay.go b/weed/filer2/meta_replay.go deleted file mode 100644 index d9cdaa76a..000000000 --- a/weed/filer2/meta_replay.go +++ /dev/null @@ -1,37 +0,0 @@ -package filer2 - -import ( - "context" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error { - message := resp.EventNotification - var oldPath util.FullPath - var newEntry *Entry - if message.OldEntry != nil { - oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) - glog.V(4).Infof("deleting %v", oldPath) - if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil { - return err - } - } - - if message.NewEntry != nil { - dir := resp.Directory - if message.NewParentPath != "" { - dir = message.NewParentPath - } - key := util.NewFullPath(dir, message.NewEntry.Name) - glog.V(4).Infof("creating %v", key) - newEntry = FromPbEntry(dir, message.NewEntry) - if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil { - return err - } - } - - return nil -} diff --git a/weed/filer2/mongodb/mongodb_store.go b/weed/filer2/mongodb/mongodb_store.go deleted file mode 100644 index 661aa4ea0..000000000 --- a/weed/filer2/mongodb/mongodb_store.go +++ /dev/null @@ -1,214 +0,0 @@ -package mongodb - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" - "time" -) - -func init() { - filer2.Stores = append(filer2.Stores, &MongodbStore{}) -} - -type MongodbStore struct { - connect *mongo.Client - database string - collectionName string -} - -type Model struct { - Directory string `bson:"directory"` - Name string `bson:"name"` - Meta []byte `bson:"meta"` -} - -func (store *MongodbStore) GetName() string { - return "mongodb" -} - -func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { - store.database = configuration.GetString(prefix + "database") - store.collectionName = "filemeta" - poolSize := configuration.GetInt(prefix + "option_pool_size") - return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize)) -} - -func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) { - ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - opts := options.Client().ApplyURI(uri) - - if poolSize > 0 { - opts.SetMaxPoolSize(poolSize) - } - - client, err := mongo.Connect(ctx, opts) - if err != nil { - return err - } - - c := client.Database(store.database).Collection(store.collectionName) - err = store.indexUnique(c) - store.connect = client - return err -} - -func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error { - _, err := c.Indexes().CreateOne(context.Background(), index, opts) - return err -} - -func (store *MongodbStore) indexUnique(c *mongo.Collection) error { - opts := options.CreateIndexes().SetMaxTime(10 * time.Second) - - unique := new(bool) - *unique = true - - index := mongo.IndexModel{ - Keys: bsonx.Doc{{Key: "directory", Value: bsonx.Int32(1)}, {Key: "name", Value: bsonx.Int32(1)}}, - Options: &options.IndexOptions{ - Unique: unique, - }, - } - - return store.createIndex(c, index, opts) -} - -func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} - -func (store *MongodbStore) CommitTransaction(ctx context.Context) error { - return nil -} - -func (store *MongodbStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - dir, name := entry.FullPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } - - c := store.connect.Database(store.database).Collection(store.collectionName) - - _, err = c.InsertOne(ctx, Model{ - Directory: dir, - Name: name, - Meta: meta, - }) - - return nil -} - -func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(ctx, entry) -} - -func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { - - dir, name := fullpath.DirAndName() - var data Model - - var where = bson.M{"directory": dir, "name": name} - err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data) - if err != mongo.ErrNoDocuments && err != nil { - return nil, filer_pb.ErrNotFound - } - - if len(data.Meta) == 0 { - return nil, filer_pb.ErrNotFound - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - - err = entry.DecodeAttributesAndChunks(data.Meta) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - - dir, name := fullpath.DirAndName() - - where := bson.M{"directory": dir, "name": name} - _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - - where := bson.M{"directory": fullpath} - _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - return nil, filer2.ErrUnsupportedListDirectoryPrefixed -} - -func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - - var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}} - if inclusive { - where["name"] = bson.M{ - "$gte": startFileName, - } - } - optLimit := int64(limit) - opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}} - cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts) - for cur.Next(ctx) { - var data Model - err := cur.Decode(&data) - if err != nil && err != mongo.ErrNoDocuments { - return nil, err - } - - entry := &filer2.Entry{ - FullPath: util.NewFullPath(string(fullpath), data.Name), - } - if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - - entries = append(entries, entry) - } - - if err := cur.Close(ctx); err != nil { - glog.V(0).Infof("list iterator close: %v", err) - } - - return entries, err -} - -func (store *MongodbStore) Shutdown() { - ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - store.connect.Disconnect(ctx) -} diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go deleted file mode 100644 index 9f748445e..000000000 --- a/weed/filer2/mysql/mysql_store.go +++ /dev/null @@ -1,74 +0,0 @@ -package mysql - -import ( - "database/sql" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql" - "github.com/chrislusf/seaweedfs/weed/util" - _ "github.com/go-sql-driver/mysql" -) - -const ( - CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8" -) - -func init() { - filer2.Stores = append(filer2.Stores, &MysqlStore{}) -} - -type MysqlStore struct { - abstract_sql.AbstractSqlStore -} - -func (store *MysqlStore) GetName() string { - return "mysql" -} - -func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) { - return store.initialize( - configuration.GetString(prefix+"username"), - configuration.GetString(prefix+"password"), - configuration.GetString(prefix+"hostname"), - configuration.GetInt(prefix+"port"), - configuration.GetString(prefix+"database"), - configuration.GetInt(prefix+"connection_max_idle"), - configuration.GetInt(prefix+"connection_max_open"), - configuration.GetBool(prefix+"interpolateParams"), - ) -} - -func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int, - interpolateParams bool) (err error) { - // - store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" - store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" - store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" - store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" - store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?" - - sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) - if interpolateParams { - sqlUrl += "&interpolateParams=true" - } - - var dbErr error - store.DB, dbErr = sql.Open("mysql", sqlUrl) - if dbErr != nil { - store.DB.Close() - store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) - } - - store.DB.SetMaxIdleConns(maxIdle) - store.DB.SetMaxOpenConns(maxOpen) - - if err = store.DB.Ping(); err != nil { - return fmt.Errorf("connect to %s error:%v", sqlUrl, err) - } - - return nil -} diff --git a/weed/filer2/permission.go b/weed/filer2/permission.go deleted file mode 100644 index 8a9508fbc..000000000 --- a/weed/filer2/permission.go +++ /dev/null @@ -1,22 +0,0 @@ -package filer2 - -func hasWritePermission(dir *Entry, entry *Entry) bool { - - if dir == nil { - return false - } - - if dir.Uid == entry.Uid && dir.Mode&0200 > 0 { - return true - } - - if dir.Gid == entry.Gid && dir.Mode&0020 > 0 { - return true - } - - if dir.Mode&0002 > 0 { - return true - } - - return false -} diff --git a/weed/filer2/postgres/README.txt b/weed/filer2/postgres/README.txt deleted file mode 100644 index cb0c99c63..000000000 --- a/weed/filer2/postgres/README.txt +++ /dev/null @@ -1,17 +0,0 @@ - -1. create "seaweedfs" database - -export PGHOME=/Library/PostgreSQL/10 -$PGHOME/bin/createdb --username=postgres --password seaweedfs - -2. create "filemeta" table -$PGHOME/bin/psql --username=postgres --password seaweedfs - -CREATE TABLE IF NOT EXISTS filemeta ( - dirhash BIGINT, - name VARCHAR(65535), - directory VARCHAR(65535), - meta bytea, - PRIMARY KEY (dirhash, name) -); - diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go deleted file mode 100644 index 87eb6aca2..000000000 --- a/weed/filer2/postgres/postgres_store.go +++ /dev/null @@ -1,75 +0,0 @@ -package postgres - -import ( - "database/sql" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql" - "github.com/chrislusf/seaweedfs/weed/util" - _ "github.com/lib/pq" -) - -const ( - CONNECTION_URL_PATTERN = "host=%s port=%d user=%s sslmode=%s connect_timeout=30" -) - -func init() { - filer2.Stores = append(filer2.Stores, &PostgresStore{}) -} - -type PostgresStore struct { - abstract_sql.AbstractSqlStore -} - -func (store *PostgresStore) GetName() string { - return "postgres" -} - -func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) { - return store.initialize( - configuration.GetString(prefix+"username"), - configuration.GetString(prefix+"password"), - configuration.GetString(prefix+"hostname"), - configuration.GetInt(prefix+"port"), - configuration.GetString(prefix+"database"), - configuration.GetString(prefix+"sslmode"), - configuration.GetInt(prefix+"connection_max_idle"), - configuration.GetInt(prefix+"connection_max_open"), - ) -} - -func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) { - - store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)" - store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" - store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" - store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" - store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like CONCAT($4,'%')ORDER BY NAME ASC LIMIT $5" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like CONCAT($4,'%') ORDER BY NAME ASC LIMIT $5" - - sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode) - if password != "" { - sqlUrl += " password=" + password - } - if database != "" { - sqlUrl += " dbname=" + database - } - var dbErr error - store.DB, dbErr = sql.Open("postgres", sqlUrl) - if dbErr != nil { - store.DB.Close() - store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) - } - - store.DB.SetMaxIdleConns(maxIdle) - store.DB.SetMaxOpenConns(maxOpen) - - if err = store.DB.Ping(); err != nil { - return fmt.Errorf("connect to %s error:%v", sqlUrl, err) - } - - return nil -} diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go deleted file mode 100644 index 0cea83ff9..000000000 --- a/weed/filer2/reader_at.go +++ /dev/null @@ -1,149 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "io" - "sync" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -type ChunkReadAt struct { - masterClient *wdclient.MasterClient - chunkViews []*ChunkView - lookupFileId func(fileId string) (targetUrl string, err error) - readerLock sync.Mutex - fileSize int64 - - chunkCache chunk_cache.ChunkCache -} - -// var _ = io.ReaderAt(&ChunkReadAt{}) - -type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) - -func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { - return func(fileId string) (targetUrl string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - vid := VolumeId(fileId) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, - }) - if err != nil { - return err - } - - locations := resp.LocationsMap[vid] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", fileId) - return fmt.Errorf("failed to locate %s", fileId) - } - - volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) - - targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) - - return nil - }) - return - } -} - -func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { - - return &ChunkReadAt{ - chunkViews: chunkViews, - lookupFileId: LookupFn(filerClient), - chunkCache: chunkCache, - fileSize: fileSize, - } -} - -func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { - - c.readerLock.Lock() - defer c.readerLock.Unlock() - - glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) - return c.doReadAt(p[n:], offset+int64(n)) -} - -func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { - - var buffer []byte - startOffset, remaining := offset, int64(len(p)) - for i, chunk := range c.chunkViews { - if remaining <= 0 { - break - } - if startOffset < chunk.LogicOffset { - gap := int(chunk.LogicOffset - startOffset) - glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) - n += int(min(int64(gap), remaining)) - startOffset, remaining = chunk.LogicOffset, remaining-int64(gap) - if remaining <= 0 { - break - } - } - // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) - chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining) - if chunkStart >= chunkStop { - continue - } - glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - buffer, err = c.readFromWholeChunkData(chunk) - if err != nil { - glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return - } - bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset - copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart]) - n += copied - startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) - } - - glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) - - if err == nil && remaining > 0 && c.fileSize > startOffset { - delta := int(min(remaining, c.fileSize-startOffset)) - glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize) - n += delta - } - - if err == nil && offset+int64(len(p)) > c.fileSize { - err = io.EOF - } - // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err) - - return - -} - -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { - - glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) - - chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) - if chunkData != nil { - glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) - } else { - glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId) - chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) - if err != nil { - return - } - c.chunkCache.SetChunk(chunkView.FileId, chunkData) - } - - return -} - -func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { - - return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped) - -} diff --git a/weed/filer2/reader_at_test.go b/weed/filer2/reader_at_test.go deleted file mode 100644 index 7bfc9a972..000000000 --- a/weed/filer2/reader_at_test.go +++ /dev/null @@ -1,156 +0,0 @@ -package filer2 - -import ( - "fmt" - "io" - "math" - "strconv" - "sync" - "testing" -) - -type mockChunkCache struct { -} - -func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { - x, _ := strconv.Atoi(fileId) - data = make([]byte, minSize) - for i := 0; i < int(minSize); i++ { - data[i] = byte(x) - } - return data -} -func (m *mockChunkCache) SetChunk(fileId string, data []byte) { -} - -func TestReaderAt(t *testing.T) { - - visibles := []VisibleInterval{ - { - start: 1, - stop: 2, - fileId: "1", - chunkSize: 9, - }, - { - start: 3, - stop: 4, - fileId: "3", - chunkSize: 1, - }, - { - start: 5, - stop: 6, - fileId: "5", - chunkSize: 2, - }, - { - start: 7, - stop: 9, - fileId: "7", - chunkSize: 2, - }, - { - start: 9, - stop: 10, - fileId: "9", - chunkSize: 2, - }, - } - - readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 10, - chunkCache: &mockChunkCache{}, - } - - testReadAt(t, readerAt, 0, 10, 10, nil) - testReadAt(t, readerAt, 0, 12, 10, io.EOF) - testReadAt(t, readerAt, 2, 8, 8, nil) - testReadAt(t, readerAt, 3, 6, 6, nil) - -} - -func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) { - data := make([]byte, size) - n, err := readerAt.ReadAt(data, offset) - - for _, d := range data { - fmt.Printf("%x", d) - } - fmt.Println() - - if expected != n { - t.Errorf("unexpected read size: %d, expect: %d", n, expected) - } - if err != expectedErr { - t.Errorf("unexpected read error: %v, expect: %v", err, expectedErr) - } - -} - -func TestReaderAt0(t *testing.T) { - - visibles := []VisibleInterval{ - { - start: 2, - stop: 5, - fileId: "1", - chunkSize: 9, - }, - { - start: 7, - stop: 9, - fileId: "2", - chunkSize: 9, - }, - } - - readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 10, - chunkCache: &mockChunkCache{}, - } - - testReadAt(t, readerAt, 0, 10, 10, nil) - testReadAt(t, readerAt, 3, 16, 7, io.EOF) - testReadAt(t, readerAt, 3, 5, 5, nil) - - testReadAt(t, readerAt, 11, 5, 0, io.EOF) - testReadAt(t, readerAt, 10, 5, 0, io.EOF) - -} - -func TestReaderAt1(t *testing.T) { - - visibles := []VisibleInterval{ - { - start: 2, - stop: 5, - fileId: "1", - chunkSize: 9, - }, - } - - readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 20, - chunkCache: &mockChunkCache{}, - } - - testReadAt(t, readerAt, 0, 20, 20, nil) - testReadAt(t, readerAt, 1, 7, 7, nil) - testReadAt(t, readerAt, 0, 1, 1, nil) - testReadAt(t, readerAt, 18, 4, 2, io.EOF) - testReadAt(t, readerAt, 12, 4, 4, nil) - testReadAt(t, readerAt, 4, 20, 16, io.EOF) - testReadAt(t, readerAt, 4, 10, 10, nil) - testReadAt(t, readerAt, 1, 10, 10, nil) - -} diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go deleted file mode 100644 index eaaecb740..000000000 --- a/weed/filer2/redis/redis_cluster_store.go +++ /dev/null @@ -1,42 +0,0 @@ -package redis - -import ( - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" -) - -func init() { - filer2.Stores = append(filer2.Stores, &RedisClusterStore{}) -} - -type RedisClusterStore struct { - UniversalRedisStore -} - -func (store *RedisClusterStore) GetName() string { - return "redis_cluster" -} - -func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) { - - configuration.SetDefault(prefix+"useReadOnly", true) - configuration.SetDefault(prefix+"routeByLatency", true) - - return store.initialize( - configuration.GetStringSlice(prefix+"addresses"), - configuration.GetString(prefix+"password"), - configuration.GetBool(prefix+"useReadOnly"), - configuration.GetBool(prefix+"routeByLatency"), - ) -} - -func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { - store.Client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: addresses, - Password: password, - ReadOnly: readOnly, - RouteByLatency: routeByLatency, - }) - return -} diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go deleted file mode 100644 index 9debdb070..000000000 --- a/weed/filer2/redis/redis_store.go +++ /dev/null @@ -1,36 +0,0 @@ -package redis - -import ( - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" -) - -func init() { - filer2.Stores = append(filer2.Stores, &RedisStore{}) -} - -type RedisStore struct { - UniversalRedisStore -} - -func (store *RedisStore) GetName() string { - return "redis" -} - -func (store *RedisStore) Initialize(configuration util.Configuration, prefix string) (err error) { - return store.initialize( - configuration.GetString(prefix+"address"), - configuration.GetString(prefix+"password"), - configuration.GetInt(prefix+"database"), - ) -} - -func (store *RedisStore) initialize(hostPort string, password string, database int) (err error) { - store.Client = redis.NewClient(&redis.Options{ - Addr: hostPort, - Password: password, - DB: database, - }) - return -} diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go deleted file mode 100644 index fc2abef6c..000000000 --- a/weed/filer2/redis/universal_redis_store.go +++ /dev/null @@ -1,191 +0,0 @@ -package redis - -import ( - "context" - "fmt" - "sort" - "strings" - "time" - - "github.com/go-redis/redis" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -const ( - DIR_LIST_MARKER = "\x00" -) - -type UniversalRedisStore struct { - Client redis.UniversalClient -} - -func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result() - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - dir, name := entry.FullPath.DirAndName() - if name != "" { - _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result() - if err != nil { - return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) - } - } - - return nil -} - -func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { - - data, err := store.Client.Get(string(fullpath)).Result() - if err == redis.Nil { - return nil, filer_pb.ErrNotFound - } - - if err != nil { - return nil, fmt.Errorf("get %s : %v", fullpath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks([]byte(data)) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { - - _, err = store.Client.Del(string(fullpath)).Result() - - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - dir, name := fullpath.DirAndName() - if name != "" { - _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result() - if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) - } - } - - return nil -} - -func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { - - members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() - if err != nil { - return fmt.Errorf("delete folder %s : %v", fullpath, err) - } - - for _, fileName := range members { - path := util.NewFullPath(string(fullpath), fileName) - _, err = store.Client.Del(string(path)).Result() - if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) - } - } - - return nil -} - -func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - return nil, filer2.ErrUnsupportedListDirectoryPrefixed -} - -func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - dirListKey := genDirectoryListKey(string(fullpath)) - members, err := store.Client.SMembers(dirListKey).Result() - if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) - } - - // skip - if startFileName != "" { - var t []string - for _, m := range members { - if strings.Compare(m, startFileName) >= 0 { - if m == startFileName { - if inclusive { - t = append(t, m) - } - } else { - t = append(t, m) - } - } - } - members = t - } - - // sort - sort.Slice(members, func(i, j int) bool { - return strings.Compare(members[i], members[j]) < 0 - }) - - // limit - if limit < len(members) { - members = members[:limit] - } - - // fetch entry meta - for _, fileName := range members { - path := util.NewFullPath(string(fullpath), fileName) - entry, err := store.FindEntry(ctx, path) - if err != nil { - glog.V(0).Infof("list %s : %v", path, err) - } else { - if entry.TtlSec > 0 { - if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - store.Client.Del(string(path)).Result() - store.Client.SRem(dirListKey, fileName).Result() - continue - } - } - entries = append(entries, entry) - } - } - - return entries, err -} - -func genDirectoryListKey(dir string) (dirList string) { - return dir + DIR_LIST_MARKER -} - -func (store *UniversalRedisStore) Shutdown() { - store.Client.Close() -} diff --git a/weed/filer2/redis2/redis_cluster_store.go b/weed/filer2/redis2/redis_cluster_store.go deleted file mode 100644 index b252eabab..000000000 --- a/weed/filer2/redis2/redis_cluster_store.go +++ /dev/null @@ -1,42 +0,0 @@ -package redis2 - -import ( - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" -) - -func init() { - filer2.Stores = append(filer2.Stores, &RedisCluster2Store{}) -} - -type RedisCluster2Store struct { - UniversalRedis2Store -} - -func (store *RedisCluster2Store) GetName() string { - return "redis_cluster2" -} - -func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) { - - configuration.SetDefault(prefix+"useReadOnly", true) - configuration.SetDefault(prefix+"routeByLatency", true) - - return store.initialize( - configuration.GetStringSlice(prefix+"addresses"), - configuration.GetString(prefix+"password"), - configuration.GetBool(prefix+"useReadOnly"), - configuration.GetBool(prefix+"routeByLatency"), - ) -} - -func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { - store.Client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: addresses, - Password: password, - ReadOnly: readOnly, - RouteByLatency: routeByLatency, - }) - return -} diff --git a/weed/filer2/redis2/redis_store.go b/weed/filer2/redis2/redis_store.go deleted file mode 100644 index 1e2a20043..000000000 --- a/weed/filer2/redis2/redis_store.go +++ /dev/null @@ -1,36 +0,0 @@ -package redis2 - -import ( - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" -) - -func init() { - filer2.Stores = append(filer2.Stores, &Redis2Store{}) -} - -type Redis2Store struct { - UniversalRedis2Store -} - -func (store *Redis2Store) GetName() string { - return "redis2" -} - -func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) { - return store.initialize( - configuration.GetString(prefix+"address"), - configuration.GetString(prefix+"password"), - configuration.GetInt(prefix+"database"), - ) -} - -func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) { - store.Client = redis.NewClient(&redis.Options{ - Addr: hostPort, - Password: password, - DB: database, - }) - return -} diff --git a/weed/filer2/redis2/universal_redis_store.go b/weed/filer2/redis2/universal_redis_store.go deleted file mode 100644 index c639635ef..000000000 --- a/weed/filer2/redis2/universal_redis_store.go +++ /dev/null @@ -1,166 +0,0 @@ -package redis2 - -import ( - "context" - "fmt" - "time" - - "github.com/go-redis/redis" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -const ( - DIR_LIST_MARKER = "\x00" -) - -type UniversalRedis2Store struct { - Client redis.UniversalClient -} - -func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - dir, name := entry.FullPath.DirAndName() - if name != "" { - if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { - return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) - } - } - - return nil -} - -func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { - - data, err := store.Client.Get(string(fullpath)).Result() - if err == redis.Nil { - return nil, filer_pb.ErrNotFound - } - - if err != nil { - return nil, fmt.Errorf("get %s : %v", fullpath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks([]byte(data)) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { - - _, err = store.Client.Del(string(fullpath)).Result() - - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - dir, name := fullpath.DirAndName() - if name != "" { - _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() - if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) - } - } - - return nil -} - -func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { - - members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() - if err != nil { - return fmt.Errorf("delete folder %s : %v", fullpath, err) - } - - for _, fileName := range members { - path := util.NewFullPath(string(fullpath), fileName) - _, err = store.Client.Del(string(path)).Result() - if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) - } - } - - return nil -} - -func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { - return nil, filer2.ErrUnsupportedListDirectoryPrefixed -} - -func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - dirListKey := genDirectoryListKey(string(fullpath)) - start := int64(0) - if startFileName != "" { - start, _ = store.Client.ZRank(dirListKey, startFileName).Result() - if !inclusive { - start++ - } - } - members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result() - if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) - } - - // fetch entry meta - for _, fileName := range members { - path := util.NewFullPath(string(fullpath), fileName) - entry, err := store.FindEntry(ctx, path) - if err != nil { - glog.V(0).Infof("list %s : %v", path, err) - } else { - if entry.TtlSec > 0 { - if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - store.Client.Del(string(path)).Result() - store.Client.ZRem(dirListKey, fileName).Result() - continue - } - } - entries = append(entries, entry) - } - } - - return entries, err -} - -func genDirectoryListKey(dir string) (dirList string) { - return dir + DIR_LIST_MARKER -} - -func (store *UniversalRedis2Store) Shutdown() { - store.Client.Close() -} diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go deleted file mode 100644 index fee9d45da..000000000 --- a/weed/filer2/stream.go +++ /dev/null @@ -1,204 +0,0 @@ -package filer2 - -import ( - "bytes" - "io" - "math" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - - // fmt.Printf("start to stream content for chunks: %+v\n", chunks) - chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) - - fileId2Url := make(map[string]string) - - for _, chunkView := range chunkViews { - - urlString, err := masterClient.LookupFileId(chunkView.FileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err - } - fileId2Url[chunkView.FileId] = urlString - } - - for _, chunkView := range chunkViews { - - urlString := fileId2Url[chunkView.FileId] - err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return err - } - } - - return nil - -} - -// ---------------- ReadAllReader ---------------------------------- - -func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { - - buffer := bytes.Buffer{} - - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { - return masterClient.LookupFileId(fileId) - } - - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - for _, chunkView := range chunkViews { - urlString, err := lookupFileIdFn(chunkView.FileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return nil, err - } - err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return nil, err - } - } - return buffer.Bytes(), nil -} - -// ---------------- ChunkStreamReader ---------------------------------- -type ChunkStreamReader struct { - chunkViews []*ChunkView - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferPos int - chunkIndex int - lookupFileId LookupFileIdFunctionType -} - -var _ = io.ReadSeeker(&ChunkStreamReader{}) - -func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { - return masterClient.LookupFileId(fileId) - } - - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - } -} - -func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - - lookupFileIdFn := LookupFn(filerClient) - - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - } -} - -func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { - for n < len(p) { - if c.isBufferEmpty() { - if c.chunkIndex >= len(c.chunkViews) { - return n, io.EOF - } - chunkView := c.chunkViews[c.chunkIndex] - c.fetchChunkToBuffer(chunkView) - c.chunkIndex++ - } - t := copy(p[n:], c.buffer[c.bufferPos:]) - c.bufferPos += t - n += t - } - return -} - -func (c *ChunkStreamReader) isBufferEmpty() bool { - return len(c.buffer) <= c.bufferPos -} - -func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { - - var totalSize int64 - for _, chunk := range c.chunkViews { - totalSize += int64(chunk.Size) - } - - var err error - switch whence { - case io.SeekStart: - case io.SeekCurrent: - offset += c.bufferOffset + int64(c.bufferPos) - case io.SeekEnd: - offset = totalSize + offset - } - if offset > totalSize { - err = io.ErrUnexpectedEOF - } - - for i, chunk := range c.chunkViews { - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) - c.chunkIndex = i + 1 - break - } - } - } - c.bufferPos = int(offset - c.bufferOffset) - - return offset, err - -} - -func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlString, err := c.lookupFileId(chunkView.FileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err - } - var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return err - } - c.buffer = buffer.Bytes() - c.bufferPos = 0 - c.bufferOffset = chunkView.LogicOffset - - // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) - - return nil -} - -func (c *ChunkStreamReader) Close() { - // TODO try to release and reuse buffer -} - -func VolumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go deleted file mode 100644 index 9c6e5c88d..000000000 --- a/weed/filer2/topics.go +++ /dev/null @@ -1,6 +0,0 @@ -package filer2 - -const ( - TopicsDir = "/topics" - SystemLogDir = TopicsDir + "/.system/log" -) |
