diff options
Diffstat (limited to 'weed/filer2')
32 files changed, 0 insertions, 3727 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go deleted file mode 100644 index d512467c7..000000000 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ /dev/null @@ -1,184 +0,0 @@ -package abstract_sql - -import ( - "context" - "database/sql" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" -) - -type AbstractSqlStore struct { - DB *sql.DB - SqlInsert string - SqlUpdate string - SqlFind string - SqlDelete string - SqlDeleteFolderChildren string - SqlListExclusive string - SqlListInclusive string -} - -type TxOrDB interface { - ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) - QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row - QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) -} - -func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) { - tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{ - Isolation: sql.LevelReadCommitted, - ReadOnly: false, - }) - if err != nil { - return ctx, err - } - - return context.WithValue(ctx, "tx", tx), nil -} -func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error { - if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx.Commit() - } - return nil -} -func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { - if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx.Rollback() - } - return nil -} - -func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB { - if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx - } - return store.DB -} - -func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - dir, name := entry.FullPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta) - if err != nil { - return fmt.Errorf("insert %s: %s", entry.FullPath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("insert %s but no rows affected: %s", entry.FullPath, err) - } - return nil -} - -func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - dir, name := entry.FullPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir) - if err != nil { - return fmt.Errorf("update %s: %s", entry.FullPath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err) - } - return nil -} - -func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) { - - dir, name := fullpath.DirAndName() - row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir) - var data []byte - if err := row.Scan(&data); err != nil { - return nil, filer2.ErrNotFound - } - - entry := &filer2.Entry{ - FullPath: fullpath, - } - if err := entry.DecodeAttributesAndChunks(data); err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { - - dir, name := fullpath.DirAndName() - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir) - if err != nil { - return fmt.Errorf("delete %s: %s", fullpath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) - } - - return nil -} - -func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { - - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, hashToLong(string(fullpath)), fullpath) - if err != nil { - return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) - } - - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) - } - - return nil -} - -func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - - sqlText := store.SqlListExclusive - if inclusive { - sqlText = store.SqlListInclusive - } - - rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit) - if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) - } - defer rows.Close() - - for rows.Next() { - var name string - var data []byte - if err = rows.Scan(&name, &data); err != nil { - glog.V(0).Infof("scan %s : %v", fullpath, err) - return nil, fmt.Errorf("scan %s: %v", fullpath, err) - } - - entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), name), - } - if err = entry.DecodeAttributesAndChunks(data); err != nil { - glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) - return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) - } - - entries = append(entries, entry) - } - - return entries, nil -} diff --git a/weed/filer2/abstract_sql/hashing.go b/weed/filer2/abstract_sql/hashing.go deleted file mode 100644 index 5c982c537..000000000 --- a/weed/filer2/abstract_sql/hashing.go +++ /dev/null @@ -1,32 +0,0 @@ -package abstract_sql - -import ( - "crypto/md5" - "io" -) - -// returns a 64 bit big int -func hashToLong(dir string) (v int64) { - h := md5.New() - io.WriteString(h, dir) - - b := h.Sum(nil) - - v += int64(b[0]) - v <<= 8 - v += int64(b[1]) - v <<= 8 - v += int64(b[2]) - v <<= 8 - v += int64(b[3]) - v <<= 8 - v += int64(b[4]) - v <<= 8 - v += int64(b[5]) - v <<= 8 - v += int64(b[6]) - v <<= 8 - v += int64(b[7]) - - return -} diff --git a/weed/filer2/cassandra/README.txt b/weed/filer2/cassandra/README.txt deleted file mode 100644 index 122c9c3f4..000000000 --- a/weed/filer2/cassandra/README.txt +++ /dev/null @@ -1,14 +0,0 @@ -1. create a keyspace - -CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1}; - -2. create filemeta table - - USE seaweedfs; - - CREATE TABLE filemeta ( - directory varchar, - name varchar, - meta blob, - PRIMARY KEY (directory, name) - ) WITH CLUSTERING ORDER BY (name ASC); diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go deleted file mode 100644 index dcaab8bc4..000000000 --- a/weed/filer2/cassandra/cassandra_store.go +++ /dev/null @@ -1,153 +0,0 @@ -package cassandra - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gocql/gocql" -) - -func init() { - filer2.Stores = append(filer2.Stores, &CassandraStore{}) -} - -type CassandraStore struct { - cluster *gocql.ClusterConfig - session *gocql.Session -} - -func (store *CassandraStore) GetName() string { - return "cassandra" -} - -func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) { - return store.initialize( - configuration.GetString("keyspace"), - configuration.GetStringSlice("hosts"), - ) -} - -func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) { - store.cluster = gocql.NewCluster(hosts...) - store.cluster.Keyspace = keyspace - store.cluster.Consistency = gocql.LocalQuorum - store.session, err = store.cluster.CreateSession() - if err != nil { - glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) - } - return -} - -func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *CassandraStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *CassandraStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - dir, name := entry.FullPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } - - if err := store.session.Query( - "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ", - dir, name, meta, entry.TtlSec).Exec(); err != nil { - return fmt.Errorf("insert %s: %s", entry.FullPath, err) - } - - return nil -} - -func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - - dir, name := fullpath.DirAndName() - var data []byte - if err := store.session.Query( - "SELECT meta FROM filemeta WHERE directory=? AND name=?", - dir, name).Consistency(gocql.One).Scan(&data); err != nil { - if err != gocql.ErrNotFound { - return nil, filer2.ErrNotFound - } - } - - if len(data) == 0 { - return nil, filer2.ErrNotFound - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { - - dir, name := fullpath.DirAndName() - - if err := store.session.Query( - "DELETE FROM filemeta WHERE directory=? AND name=?", - dir, name).Exec(); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { - - if err := store.session.Query( - "DELETE FROM filemeta WHERE directory=?", - fullpath).Exec(); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" - if inclusive { - cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" - } - - var data []byte - var name string - iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter() - for iter.Scan(&name, &data) { - entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), name), - } - if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - if err := iter.Close(); err != nil { - glog.V(0).Infof("list iterator close: %v", err) - } - - return entries, err -} diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go deleted file mode 100644 index 7b05b53dc..000000000 --- a/weed/filer2/configuration.go +++ /dev/null @@ -1,51 +0,0 @@ -package filer2 - -import ( - "os" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/spf13/viper" -) - -var ( - Stores []FilerStore -) - -func (f *Filer) LoadConfiguration(config *viper.Viper) { - - validateOneEnabledStore(config) - - for _, store := range Stores { - if config.GetBool(store.GetName() + ".enabled") { - viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize store for %s: %+v", - store.GetName(), err) - } - f.SetStore(store) - glog.V(0).Infof("Configure filer for %s", store.GetName()) - return - } - } - - println() - println("Supported filer stores are:") - for _, store := range Stores { - println(" " + store.GetName()) - } - - os.Exit(-1) -} - -func validateOneEnabledStore(config *viper.Viper) { - enabledStore := "" - for _, store := range Stores { - if config.GetBool(store.GetName() + ".enabled") { - if enabledStore == "" { - enabledStore = store.GetName() - } else { - glog.Fatalf("Filer store is enabled for both %s and %s", enabledStore, store.GetName()) - } - } - } -} diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go deleted file mode 100644 index c901927bb..000000000 --- a/weed/filer2/entry.go +++ /dev/null @@ -1,73 +0,0 @@ -package filer2 - -import ( - "os" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -type Attr struct { - Mtime time.Time // time of last modification - Crtime time.Time // time of creation (OS X only) - Mode os.FileMode // file mode - Uid uint32 // owner uid - Gid uint32 // group gid - Mime string // mime type - Replication string // replication - Collection string // collection name - TtlSec int32 // ttl in seconds - UserName string - GroupNames []string - SymlinkTarget string -} - -func (attr Attr) IsDirectory() bool { - return attr.Mode&os.ModeDir > 0 -} - -type Entry struct { - FullPath - - Attr - Extended map[string][]byte - - // the following is for files - Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` -} - -func (entry *Entry) Size() uint64 { - return TotalSize(entry.Chunks) -} - -func (entry *Entry) Timestamp() time.Time { - if entry.IsDirectory() { - return entry.Crtime - } else { - return entry.Mtime - } -} - -func (entry *Entry) ToProtoEntry() *filer_pb.Entry { - if entry == nil { - return nil - } - return &filer_pb.Entry{ - Name: entry.FullPath.Name(), - IsDirectory: entry.IsDirectory(), - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - } -} - -func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { - if entry == nil { - return nil - } - dir, _ := entry.FullPath.DirAndName() - return &filer_pb.FullEntry{ - Dir: dir, - Entry: entry.ToProtoEntry(), - } -} diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go deleted file mode 100644 index 3a2dc6134..000000000 --- a/weed/filer2/entry_codec.go +++ /dev/null @@ -1,116 +0,0 @@ -package filer2 - -import ( - "bytes" - "fmt" - "os" - "time" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { - message := &filer_pb.Entry{ - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - } - return proto.Marshal(message) -} - -func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { - - message := &filer_pb.Entry{} - - if err := proto.UnmarshalMerge(blob, message); err != nil { - return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err) - } - - entry.Attr = PbToEntryAttribute(message.Attributes) - - entry.Extended = message.Extended - - entry.Chunks = message.Chunks - - return nil -} - -func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { - - return &filer_pb.FuseAttributes{ - Crtime: entry.Attr.Crtime.Unix(), - Mtime: entry.Attr.Mtime.Unix(), - FileMode: uint32(entry.Attr.Mode), - Uid: entry.Uid, - Gid: entry.Gid, - Mime: entry.Mime, - Collection: entry.Attr.Collection, - Replication: entry.Attr.Replication, - TtlSec: entry.Attr.TtlSec, - UserName: entry.Attr.UserName, - GroupName: entry.Attr.GroupNames, - SymlinkTarget: entry.Attr.SymlinkTarget, - } -} - -func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { - - t := Attr{} - - t.Crtime = time.Unix(attr.Crtime, 0) - t.Mtime = time.Unix(attr.Mtime, 0) - t.Mode = os.FileMode(attr.FileMode) - t.Uid = attr.Uid - t.Gid = attr.Gid - t.Mime = attr.Mime - t.Collection = attr.Collection - t.Replication = attr.Replication - t.TtlSec = attr.TtlSec - t.UserName = attr.UserName - t.GroupNames = attr.GroupName - t.SymlinkTarget = attr.SymlinkTarget - - return t -} - -func EqualEntry(a, b *Entry) bool { - if a == b { - return true - } - if a == nil && b != nil || a != nil && b == nil { - return false - } - if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) { - return false - } - if len(a.Chunks) != len(b.Chunks) { - return false - } - - if !eq(a.Extended, b.Extended) { - return false - } - - for i := 0; i < len(a.Chunks); i++ { - if !proto.Equal(a.Chunks[i], b.Chunks[i]) { - return false - } - } - return true -} - -func eq(a, b map[string][]byte) bool { - if len(a) != len(b) { - return false - } - - for k, v := range a { - if w, ok := b[k]; !ok || !bytes.Equal(v, w) { - return false - } - } - - return true -} diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go deleted file mode 100644 index 2eb9e3e86..000000000 --- a/weed/filer2/etcd/etcd_store.go +++ /dev/null @@ -1,196 +0,0 @@ -package etcd - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" - "go.etcd.io/etcd/clientv3" -) - -const ( - DIR_FILE_SEPARATOR = byte(0x00) -) - -func init() { - filer2.Stores = append(filer2.Stores, &EtcdStore{}) -} - -type EtcdStore struct { - client *clientv3.Client -} - -func (store *EtcdStore) GetName() string { - return "etcd" -} - -func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) { - servers := configuration.GetString("servers") - if servers == "" { - servers = "localhost:2379" - } - - timeout := configuration.GetString("timeout") - if timeout == "" { - timeout = "3s" - } - - return store.initialize(servers, timeout) -} - -func (store *EtcdStore) initialize(servers string, timeout string) (err error) { - glog.Infof("filer store etcd: %s", servers) - - to, err := time.ParseDuration(timeout) - if err != nil { - return fmt.Errorf("parse timeout %s: %s", timeout, err) - } - - store.client, err = clientv3.New(clientv3.Config{ - Endpoints: strings.Split(servers, ","), - DialTimeout: to, - }) - if err != nil { - return fmt.Errorf("connect to etcd %s: %s", servers, err) - } - - return -} - -func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *EtcdStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *EtcdStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - key := genKey(entry.DirAndName()) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - if _, err := store.client.Put(ctx, string(key), string(value)); err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - return nil -} - -func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(ctx, entry) -} - -func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - key := genKey(fullpath.DirAndName()) - - resp, err := store.client.Get(ctx, string(key)) - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - if len(resp.Kvs) == 0 { - return nil, filer2.ErrNotFound - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - key := genKey(fullpath.DirAndName()) - - if _, err := store.client.Delete(ctx, string(key)); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { - return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err) - } - - return nil -} - -func (store *EtcdStore) ListDirectoryEntries( - ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int, -) (entries []*filer2.Entry, err error) { - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - resp, err := store.client.Get(ctx, string(directoryPrefix), - clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) - if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) - } - - for _, kv := range resp.Kvs { - fileName := getNameFromKey(kv.Key) - if fileName == "" { - continue - } - if fileName == startFileName && !inclusive { - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), - } - if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - - return entries, err -} - -func genKey(dirPath, fileName string) (key []byte) { - key = []byte(dirPath) - key = append(key, DIR_FILE_SEPARATOR) - key = append(key, []byte(fileName)...) - return key -} - -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { - keyPrefix = []byte(string(fullpath)) - keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix -} - -func getNameFromKey(key []byte) string { - sepIndex := len(key) - 1 - for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR { - sepIndex-- - } - - return string(key[sepIndex+1:]) -} diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go deleted file mode 100644 index b5876df82..000000000 --- a/weed/filer2/filechunks.go +++ /dev/null @@ -1,228 +0,0 @@ -package filer2 - -import ( - "fmt" - "hash/fnv" - "sort" - "sync" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { - for _, c := range chunks { - t := uint64(c.Offset + int64(c.Size)) - if size < t { - size = t - } - } - return -} - -func ETag(chunks []*filer_pb.FileChunk) (etag string) { - if len(chunks) == 1 { - return chunks[0].ETag - } - - h := fnv.New32a() - for _, c := range chunks { - h.Write([]byte(c.ETag)) - } - return fmt.Sprintf("%x", h.Sum32()) -} - -func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - - visibles := NonOverlappingVisibleIntervals(chunks) - - fileIds := make(map[string]bool) - for _, interval := range visibles { - fileIds[interval.fileId] = true - } - for _, chunk := range chunks { - if _, found := fileIds[chunk.GetFileIdString()]; found { - compacted = append(compacted, chunk) - } else { - garbage = append(garbage, chunk) - } - } - - return -} - -func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { - - fileIds := make(map[string]bool) - for _, interval := range bs { - fileIds[interval.GetFileIdString()] = true - } - for _, chunk := range as { - if _, found := fileIds[chunk.GetFileIdString()]; !found { - delta = append(delta, chunk) - } - } - - return -} - -type ChunkView struct { - FileId string - Offset int64 - Size uint64 - LogicOffset int64 - IsFullChunk bool -} - -func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { - - visibles := NonOverlappingVisibleIntervals(chunks) - - return ViewFromVisibleIntervals(visibles, offset, size) - -} - -func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int) (views []*ChunkView) { - - stop := offset + int64(size) - - for _, chunk := range visibles { - if chunk.start <= offset && offset < chunk.stop && offset < stop { - isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop - views = append(views, &ChunkView{ - FileId: chunk.fileId, - Offset: offset - chunk.start, // offset is the data starting location in this file id - Size: uint64(min(chunk.stop, stop) - offset), - LogicOffset: offset, - IsFullChunk: isFullChunk, - }) - offset = min(chunk.stop, stop) - } - } - - return views - -} - -func logPrintf(name string, visibles []VisibleInterval) { - /* - log.Printf("%s len %d", name, len(visibles)) - for _, v := range visibles { - log.Printf("%s: => %+v", name, v) - } - */ -} - -var bufPool = sync.Pool{ - New: func() interface{} { - return new(VisibleInterval) - }, -} - -func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { - - newV := newVisibleInterval( - chunk.Offset, - chunk.Offset+int64(chunk.Size), - chunk.GetFileIdString(), - chunk.Mtime, - true, - ) - - length := len(visibles) - if length == 0 { - return append(visibles, newV) - } - last := visibles[length-1] - if last.stop <= chunk.Offset { - return append(visibles, newV) - } - - logPrintf(" before", visibles) - for _, v := range visibles { - if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval( - v.start, - chunk.Offset, - v.fileId, - v.modifiedTime, - false, - )) - } - chunkStop := chunk.Offset + int64(chunk.Size) - if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false, - )) - } - if chunkStop <= v.start || v.stop <= chunk.Offset { - newVisibles = append(newVisibles, v) - } - } - newVisibles = append(newVisibles, newV) - - logPrintf(" append", newVisibles) - - for i := len(newVisibles) - 1; i >= 0; i-- { - if i > 0 && newV.start < newVisibles[i-1].start { - newVisibles[i] = newVisibles[i-1] - } else { - newVisibles[i] = newV - break - } - } - logPrintf(" sorted", newVisibles) - - return newVisibles -} - -func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { - - sort.Slice(chunks, func(i, j int) bool { - return chunks[i].Mtime < chunks[j].Mtime - }) - - var newVisibles []VisibleInterval - for _, chunk := range chunks { - newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk) - t := visibles[:0] - visibles = newVisibles - newVisibles = t - - logPrintf("add", visibles) - - } - - return -} - -// find non-overlapping visible intervals -// visible interval map to one file chunk - -type VisibleInterval struct { - start int64 - stop int64 - modifiedTime int64 - fileId string - isFullChunk bool -} - -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) VisibleInterval { - return VisibleInterval{ - start: start, - stop: stop, - fileId: fileId, - modifiedTime: modifiedTime, - isFullChunk: isFullChunk, - } -} - -func min(x, y int64) int64 { - if x <= y { - return x - } - return y -} diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go deleted file mode 100644 index e75e60753..000000000 --- a/weed/filer2/filechunks_test.go +++ /dev/null @@ -1,384 +0,0 @@ -package filer2 - -import ( - "log" - "testing" - - "fmt" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func TestCompactFileChunks(t *testing.T) { - chunks := []*filer_pb.FileChunk{ - {Offset: 10, Size: 100, FileId: "abc", Mtime: 50}, - {Offset: 100, Size: 100, FileId: "def", Mtime: 100}, - {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200}, - {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300}, - } - - compacted, garbage := CompactFileChunks(chunks) - - if len(compacted) != 3 { - t.Fatalf("unexpected compacted: %d", len(compacted)) - } - if len(garbage) != 1 { - t.Fatalf("unexpected garbage: %d", len(garbage)) - } - -} - -func TestCompactFileChunks2(t *testing.T) { - - chunks := []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 50}, - {Offset: 100, Size: 100, FileId: "def", Mtime: 100}, - {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200}, - {Offset: 0, Size: 100, FileId: "abcf", Mtime: 300}, - {Offset: 50, Size: 100, FileId: "fhfh", Mtime: 400}, - {Offset: 100, Size: 100, FileId: "yuyu", Mtime: 500}, - } - - k := 3 - - for n := 0; n < k; n++ { - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n), - }) - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k), - }) - } - - compacted, garbage := CompactFileChunks(chunks) - - if len(compacted) != 4 { - t.Fatalf("unexpected compacted: %d", len(compacted)) - } - if len(garbage) != 8 { - t.Fatalf("unexpected garbage: %d", len(garbage)) - } -} - -func TestIntervalMerging(t *testing.T) { - - testcases := []struct { - Chunks []*filer_pb.FileChunk - Expected []*VisibleInterval - }{ - // case 0: normal - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, - {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 100, fileId: "abc"}, - {start: 100, stop: 200, fileId: "asdf"}, - {start: 200, stop: 300, fileId: "fsad"}, - }, - }, - // case 1: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "asdf"}, - }, - }, - // case 2: updates overwrite part of previous chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 50, fileId: "asdf"}, - {start: 50, stop: 100, fileId: "abc"}, - }, - }, - // case 3: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 50, fileId: "asdf"}, - {start: 50, stop: 300, fileId: "xxxx"}, - }, - }, - // case 4: updates far away from prev chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "asdf"}, - {start: 250, stop: 500, fileId: "xxxx"}, - }, - }, - // case 5: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, - {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "asdf"}, - {start: 200, stop: 220, fileId: "abc"}, - }, - }, - // case 6: same updates - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 100, fileId: "abc"}, - }, - }, - // case 7: real updates - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 2097152, FileId: "7,0294cbb9892b", Mtime: 123}, - {Offset: 0, Size: 3145728, FileId: "3,029565bf3092", Mtime: 130}, - {Offset: 2097152, Size: 3145728, FileId: "6,029632f47ae2", Mtime: 140}, - {Offset: 5242880, Size: 3145728, FileId: "2,029734c5aa10", Mtime: 150}, - {Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160}, - {Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 2097152, fileId: "3,029565bf3092"}, - {start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"}, - {start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"}, - {start: 8388608, stop: 11534336, fileId: "5,02982f80de50"}, - {start: 11534336, stop: 14376529, fileId: "7,0299ad723803"}, - }, - }, - // case 8: real bug - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 77824, FileId: "4,0b3df938e301", Mtime: 123}, - {Offset: 471040, Size: 472225 - 471040, FileId: "6,0b3e0650019c", Mtime: 130}, - {Offset: 77824, Size: 208896 - 77824, FileId: "4,0b3f0c7202f0", Mtime: 140}, - {Offset: 208896, Size: 339968 - 208896, FileId: "2,0b4031a72689", Mtime: 150}, - {Offset: 339968, Size: 471040 - 339968, FileId: "3,0b416a557362", Mtime: 160}, - }, - Expected: []*VisibleInterval{ - {start: 0, stop: 77824, fileId: "4,0b3df938e301"}, - {start: 77824, stop: 208896, fileId: "4,0b3f0c7202f0"}, - {start: 208896, stop: 339968, fileId: "2,0b4031a72689"}, - {start: 339968, stop: 471040, fileId: "3,0b416a557362"}, - {start: 471040, stop: 472225, fileId: "6,0b3e0650019c"}, - }, - }, - } - - for i, testcase := range testcases { - log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals := NonOverlappingVisibleIntervals(testcase.Chunks) - for x, interval := range intervals { - log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", - i, x, interval.start, interval.stop, interval.fileId) - } - for x, interval := range intervals { - if interval.start != testcase.Expected[x].start { - t.Fatalf("failed on test case %d, interval %d, start %d, expect %d", - i, x, interval.start, testcase.Expected[x].start) - } - if interval.stop != testcase.Expected[x].stop { - t.Fatalf("failed on test case %d, interval %d, stop %d, expect %d", - i, x, interval.stop, testcase.Expected[x].stop) - } - if interval.fileId != testcase.Expected[x].fileId { - t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s", - i, x, interval.fileId, testcase.Expected[x].fileId) - } - } - if len(intervals) != len(testcase.Expected) { - t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected)) - } - - } - -} - -func TestChunksReading(t *testing.T) { - - testcases := []struct { - Chunks []*filer_pb.FileChunk - Offset int64 - Size int - Expected []*ChunkView - }{ - // case 0: normal - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, - {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, - }, - Offset: 0, - Size: 250, - Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, - {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200}, - }, - }, - // case 1: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - }, - Offset: 50, - Size: 100, - Expected: []*ChunkView{ - {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50}, - }, - }, - // case 2: updates overwrite part of previous chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, - }, - Offset: 25, - Size: 50, - Expected: []*ChunkView{ - {Offset: 25, Size: 25, FileId: "asdf", LogicOffset: 25}, - {Offset: 0, Size: 25, FileId: "abc", LogicOffset: 50}, - }, - }, - // case 3: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Offset: 0, - Size: 200, - Expected: []*ChunkView{ - {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50}, - }, - }, - // case 4: updates far away from prev chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134}, - {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154}, - }, - Offset: 0, - Size: 400, - Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, - // {Offset: 0, Size: 150, FileId: "xxxx"}, // missing intervals should not happen - }, - }, - // case 5: updates overwrite full chunks - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, - {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, - }, - Offset: 0, - Size: 220, - Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 20, FileId: "abc", LogicOffset: 200}, - }, - }, - // case 6: same updates - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - }, - Offset: 0, - Size: 100, - Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, - }, - }, - // case 7: edge cases - { - Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134}, - {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353}, - }, - Offset: 0, - Size: 200, - Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, - {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100}, - }, - }, - } - - for i, testcase := range testcases { - log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) - chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size) - for x, chunk := range chunks { - log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", - i, x, chunk.Offset, chunk.Size, chunk.FileId) - if chunk.Offset != testcase.Expected[x].Offset { - t.Fatalf("failed on read case %d, chunk %d, Offset %d, expect %d", - i, x, chunk.Offset, testcase.Expected[x].Offset) - } - if chunk.Size != testcase.Expected[x].Size { - t.Fatalf("failed on read case %d, chunk %d, Size %d, expect %d", - i, x, chunk.Size, testcase.Expected[x].Size) - } - if chunk.FileId != testcase.Expected[x].FileId { - t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s", - i, x, chunk.FileId, testcase.Expected[x].FileId) - } - if chunk.LogicOffset != testcase.Expected[x].LogicOffset { - t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d", - i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset) - } - } - if len(chunks) != len(testcase.Expected) { - t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected)) - } - } - -} - -func BenchmarkCompactFileChunks(b *testing.B) { - - var chunks []*filer_pb.FileChunk - - k := 1024 - - for n := 0; n < k; n++ { - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n), - }) - chunks = append(chunks, &filer_pb.FileChunk{ - Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k), - }) - } - - for n := 0; n < b.N; n++ { - CompactFileChunks(chunks) - } -} diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go deleted file mode 100644 index b724e20fd..000000000 --- a/weed/filer2/filer.go +++ /dev/null @@ -1,253 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "os" - "path/filepath" - "strings" - "time" - - "google.golang.org/grpc" - - "github.com/karlseguin/ccache" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -const PaginationSize = 1024 * 256 - -var ( - OS_UID = uint32(os.Getuid()) - OS_GID = uint32(os.Getgid()) -) - -type Filer struct { - store *FilerStoreWrapper - directoryCache *ccache.Cache - MasterClient *wdclient.MasterClient - fileIdDeletionChan chan string - GrpcDialOption grpc.DialOption -} - -func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer { - f := &Filer{ - directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), - MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters), - fileIdDeletionChan: make(chan string, PaginationSize), - GrpcDialOption: grpcDialOption, - } - - go f.loopProcessingDeletion() - - return f -} - -func (f *Filer) SetStore(store FilerStore) { - f.store = NewFilerStoreWrapper(store) -} - -func (f *Filer) DisableDirectoryCache() { - f.directoryCache = nil -} - -func (fs *Filer) GetMaster() string { - return fs.MasterClient.GetMaster() -} - -func (fs *Filer) KeepConnectedToMaster() { - fs.MasterClient.KeepConnectedToMaster() -} - -func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) { - return f.store.BeginTransaction(ctx) -} - -func (f *Filer) CommitTransaction(ctx context.Context) error { - return f.store.CommitTransaction(ctx) -} - -func (f *Filer) RollbackTransaction(ctx context.Context) error { - return f.store.RollbackTransaction(ctx) -} - -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { - - if string(entry.FullPath) == "/" { - return nil - } - - dirParts := strings.Split(string(entry.FullPath), "/") - - // fmt.Printf("directory parts: %+v\n", dirParts) - - var lastDirectoryEntry *Entry - - for i := 1; i < len(dirParts); i++ { - dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...)) - // fmt.Printf("%d directory: %+v\n", i, dirPath) - - // first check local cache - dirEntry := f.cacheGetDirectory(dirPath) - - // not found, check the store directly - if dirEntry == nil { - glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath)) - } else { - glog.V(4).Infof("found cached directory: %s", dirPath) - } - - // no such existing directory - if dirEntry == nil { - - // create the directory - now := time.Now() - - dirEntry = &Entry{ - FullPath: FullPath(dirPath), - Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | 0770, - Uid: entry.Uid, - Gid: entry.Gid, - }, - } - - glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) - mkdirErr := f.store.InsertEntry(ctx, dirEntry) - if mkdirErr != nil { - if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound { - return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) - } - } else { - f.NotifyUpdateEvent(nil, dirEntry, false) - } - - } else if !dirEntry.IsDirectory() { - return fmt.Errorf("%s is a file", dirPath) - } - - // cache the directory entry - f.cacheSetDirectory(dirPath, dirEntry, i) - - // remember the direct parent directory entry - if i == len(dirParts)-1 { - lastDirectoryEntry = dirEntry - } - - } - - if lastDirectoryEntry == nil { - return fmt.Errorf("parent folder not found: %v", entry.FullPath) - } - - /* - if !hasWritePermission(lastDirectoryEntry, entry) { - glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d", - lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid) - return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath) - } - */ - - oldEntry, _ := f.FindEntry(ctx, entry.FullPath) - - if oldEntry == nil { - if err := f.store.InsertEntry(ctx, entry); err != nil { - glog.Errorf("insert entry %s: %v", entry.FullPath, err) - return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) - } - } else { - if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil { - glog.Errorf("update entry %s: %v", entry.FullPath, err) - return fmt.Errorf("update entry %s: %v", entry.FullPath, err) - } - } - - f.NotifyUpdateEvent(oldEntry, entry, true) - - f.deleteChunksIfNotNew(oldEntry, entry) - - return nil -} - -func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { - if oldEntry != nil { - if oldEntry.IsDirectory() && !entry.IsDirectory() { - glog.Errorf("existing %s is a directory", entry.FullPath) - return fmt.Errorf("existing %s is a directory", entry.FullPath) - } - if !oldEntry.IsDirectory() && entry.IsDirectory() { - glog.Errorf("existing %s is a file", entry.FullPath) - return fmt.Errorf("existing %s is a file", entry.FullPath) - } - } - return f.store.UpdateEntry(ctx, entry) -} - -func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) { - - now := time.Now() - - if string(p) == "/" { - return &Entry{ - FullPath: p, - Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | 0755, - Uid: OS_UID, - Gid: OS_GID, - }, - }, nil - } - return f.store.FindEntry(ctx, p) -} - -func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { - if strings.HasSuffix(string(p), "/") && len(p) > 1 { - p = p[0 : len(p)-1] - } - return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) -} - -func (f *Filer) cacheDelDirectory(dirpath string) { - - if dirpath == "/" { - return - } - - if f.directoryCache == nil { - return - } - f.directoryCache.Delete(dirpath) - return -} - -func (f *Filer) cacheGetDirectory(dirpath string) *Entry { - - if f.directoryCache == nil { - return nil - } - item := f.directoryCache.Get(dirpath) - if item == nil { - return nil - } - return item.Value().(*Entry) -} - -func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { - - if f.directoryCache == nil { - return - } - - minutes := 60 - if level < 10 { - minutes -= level * 6 - } - - f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute) -} diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go deleted file mode 100644 index 1a10f7c20..000000000 --- a/weed/filer2/filer_client_util.go +++ /dev/null @@ -1,172 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "io" - "math" - "strings" - "sync" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func VolumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} - -type FilerClient interface { - WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error -} - -func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { - var vids []string - for _, chunkView := range chunkViews { - vids = append(vids, VolumeId(chunkView.FileId)) - } - - vid2Locations := make(map[string]*filer_pb.Locations) - - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return err - } - - vid2Locations = resp.LocationsMap - - return nil - }) - - if err != nil { - return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) - } - - var wg sync.WaitGroup - for _, chunkView := range chunkViews { - wg.Add(1) - go func(chunkView *ChunkView) { - defer wg.Done() - - glog.V(4).Infof("read fh reading chunk: %+v", chunkView) - - locations := vid2Locations[VolumeId(chunkView.FileId)] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", chunkView.FileId) - err = fmt.Errorf("failed to locate %s", chunkView.FileId) - return - } - - var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], - !chunkView.IsFullChunk) - - if err != nil { - - glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) - - err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) - return - } - - glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) - totalRead += n - - }(chunkView) - } - wg.Wait() - return -} - -func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) { - - dir, name := FullPath(fullFilePath).DirAndName() - - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir, - Name: name, - } - - glog.V(3).Infof("read %s request: %v", fullFilePath, request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) { - return nil - } - glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err) - return err - } - - if resp.Entry == nil { - glog.V(3).Infof("read %s entry: %v", fullFilePath, entry) - return nil - } - - entry = resp.Entry - return nil - }) - - return -} - -func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) { - - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - lastEntryName := "" - - request := &filer_pb.ListEntriesRequest{ - Directory: fullDirPath, - Prefix: prefix, - StartFromFileName: lastEntryName, - Limit: math.MaxUint32, - } - - glog.V(3).Infof("read directory: %v", request) - stream, err := client.ListEntries(ctx, request) - if err != nil { - return fmt.Errorf("list %s: %v", fullDirPath, err) - } - - var prevEntry *filer_pb.Entry - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - if prevEntry != nil { - fn(prevEntry, true) - } - break - } else { - return recvErr - } - } - if prevEntry != nil { - fn(prevEntry, false) - } - prevEntry = resp.Entry - } - - return nil - - }) - - return -} diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go deleted file mode 100644 index 75a09e7ef..000000000 --- a/weed/filer2/filer_delete_entry.go +++ /dev/null @@ -1,102 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { - if p == "/" { - return nil - } - - entry, findErr := f.FindEntry(ctx, p) - if findErr != nil { - return findErr - } - - var chunks []*filer_pb.FileChunk - chunks = append(chunks, entry.Chunks...) - if entry.IsDirectory() { - // delete the folder children, not including the folder itself - var dirChunks []*filer_pb.FileChunk - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks) - if err != nil { - return fmt.Errorf("delete directory %s: %v", p, err) - } - chunks = append(chunks, dirChunks...) - f.cacheDelDirectory(string(p)) - } - // delete the file or folder - err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks) - if err != nil { - return fmt.Errorf("delete file %s: %v", p, err) - } - - if shouldDeleteChunks { - go f.DeleteChunks(chunks) - } - - return nil -} - -func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) { - - lastFileName := "" - includeLastFile := false - for { - entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize) - if err != nil { - glog.Errorf("list folder %s: %v", entry.FullPath, err) - return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) - } - if lastFileName == "" && !isRecursive && len(entries) > 0 { - // only for first iteration in the loop - return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) - } - - for _, sub := range entries { - lastFileName = sub.Name() - var dirChunks []*filer_pb.FileChunk - if sub.IsDirectory() { - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks) - } - if err != nil && !ignoreRecursiveError { - return nil, err - } - if shouldDeleteChunks { - chunks = append(chunks, dirChunks...) - } - } - - if len(entries) < PaginationSize { - break - } - } - - f.cacheDelDirectory(string(entry.FullPath)) - - glog.V(3).Infof("deleting directory %v", entry.FullPath) - - if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { - return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) - } - f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) - - return chunks, nil -} - -func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) { - - glog.V(3).Infof("deleting entry %v", entry.FullPath) - - if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { - return fmt.Errorf("filer store delete: %v", storeDeletionErr) - } - f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) - - return nil -} diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go deleted file mode 100644 index 9937685f7..000000000 --- a/weed/filer2/filer_deletion.go +++ /dev/null @@ -1,87 +0,0 @@ -package filer2 - -import ( - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (f *Filer) loopProcessingDeletion() { - - ticker := time.NewTicker(5 * time.Second) - - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { - m := make(map[string]operation.LookupResult) - for _, vid := range vids { - locs, _ := f.MasterClient.GetVidLocations(vid) - var locations []operation.Location - for _, loc := range locs { - locations = append(locations, operation.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - }) - } - m[vid] = operation.LookupResult{ - VolumeId: vid, - Locations: locations, - } - } - return m, nil - } - - var fileIds []string - for { - select { - case fid := <-f.fileIdDeletionChan: - fileIds = append(fileIds, fid) - if len(fileIds) >= 4096 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] - } - } - } -} - -func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { - for _, chunk := range chunks { - f.fileIdDeletionChan <- chunk.GetFileIdString() - } -} - -// DeleteFileByFileId direct delete by file id. -// Only used when the fileId is not being managed by snapshots. -func (f *Filer) DeleteFileByFileId(fileId string) { - f.fileIdDeletionChan <- fileId -} - -func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { - - if oldEntry == nil { - return - } - if newEntry == nil { - f.DeleteChunks(oldEntry.Chunks) - } - - var toDelete []*filer_pb.FileChunk - newChunkIds := make(map[string]bool) - for _, newChunk := range newEntry.Chunks { - newChunkIds[newChunk.GetFileIdString()] = true - } - - for _, oldChunk := range oldEntry.Chunks { - if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { - toDelete = append(toDelete, oldChunk) - } - } - f.DeleteChunks(toDelete) -} diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go deleted file mode 100644 index c37381116..000000000 --- a/weed/filer2/filer_notify.go +++ /dev/null @@ -1,39 +0,0 @@ -package filer2 - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/notification" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) { - var key string - if oldEntry != nil { - key = string(oldEntry.FullPath) - } else if newEntry != nil { - key = string(newEntry.FullPath) - } else { - return - } - - if notification.Queue != nil { - - glog.V(3).Infof("notifying entry update %v", key) - - newParentPath := "" - if newEntry != nil { - newParentPath, _ = newEntry.FullPath.DirAndName() - } - - notification.Queue.SendMessage( - key, - &filer_pb.EventNotification{ - OldEntry: oldEntry.ToProtoEntry(), - NewEntry: newEntry.ToProtoEntry(), - DeleteChunks: deleteChunks, - NewParentPath: newParentPath, - }, - ) - - } -} diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go deleted file mode 100644 index b74e2ad35..000000000 --- a/weed/filer2/filer_notify_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package filer2 - -import ( - "testing" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/golang/protobuf/proto" -) - -func TestProtoMarshalText(t *testing.T) { - - oldEntry := &Entry{ - FullPath: FullPath("/this/path/to"), - Attr: Attr{ - Mtime: time.Now(), - Mode: 0644, - Uid: 1, - Mime: "text/json", - TtlSec: 25, - }, - Chunks: []*filer_pb.FileChunk{ - &filer_pb.FileChunk{ - FileId: "234,2423423422", - Offset: 234234, - Size: 234, - Mtime: 12312423, - ETag: "2342342354", - SourceFileId: "23234,2342342342", - }, - }, - } - - notification := &filer_pb.EventNotification{ - OldEntry: oldEntry.ToProtoEntry(), - NewEntry: nil, - DeleteChunks: true, - } - - text := proto.MarshalTextString(notification) - - notification2 := &filer_pb.EventNotification{} - proto.UnmarshalText(text, notification2) - - if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId { - t.Fatalf("marshal/unmarshal error: %s", text) - } - - println(text) - -} diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go deleted file mode 100644 index 0bb0bd611..000000000 --- a/weed/filer2/filerstore.go +++ /dev/null @@ -1,138 +0,0 @@ -package filer2 - -import ( - "context" - "errors" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type FilerStore interface { - // GetName gets the name to locate the configuration in filer.toml file - GetName() string - // Initialize initializes the file store - Initialize(configuration util.Configuration) error - InsertEntry(context.Context, *Entry) error - UpdateEntry(context.Context, *Entry) (err error) - // err == filer2.ErrNotFound if not found - FindEntry(context.Context, FullPath) (entry *Entry, err error) - DeleteEntry(context.Context, FullPath) (err error) - DeleteFolderChildren(context.Context, FullPath) (err error) - ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) - - BeginTransaction(ctx context.Context) (context.Context, error) - CommitTransaction(ctx context.Context) error - RollbackTransaction(ctx context.Context) error -} - -var ErrNotFound = errors.New("filer: no entry is found in filer store") - -type FilerStoreWrapper struct { - actualStore FilerStore -} - -func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { - if innerStore, ok := store.(*FilerStoreWrapper); ok { - return innerStore - } - return &FilerStoreWrapper{ - actualStore: store, - } -} - -func (fsw *FilerStoreWrapper) GetName() string { - return fsw.actualStore.GetName() -} - -func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error { - return fsw.actualStore.Initialize(configuration) -} - -func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { - stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - return fsw.actualStore.InsertEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { - stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - return fsw.actualStore.UpdateEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds()) - }() - - entry, err = fsw.actualStore.FindEntry(ctx, fp) - if err != nil { - return nil, err - } - filer_pb.AfterEntryDeserialization(entry.Chunks) - return -} - -func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) - }() - - return fsw.actualStore.DeleteEntry(ctx, fp) -} - -func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) { - stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) - }() - - return fsw.actualStore.DeleteFolderChildren(ctx, fp) -} - -func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { - stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) - }() - - entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - for _, entry := range entries { - filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, err -} - -func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { - return fsw.actualStore.BeginTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { - return fsw.actualStore.CommitTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { - return fsw.actualStore.RollbackTransaction(ctx) -} diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go deleted file mode 100644 index 191e51cf3..000000000 --- a/weed/filer2/fullpath.go +++ /dev/null @@ -1,36 +0,0 @@ -package filer2 - -import ( - "path/filepath" - "strings" -) - -type FullPath string - -func NewFullPath(dir, name string) FullPath { - return FullPath(dir).Child(name) -} - -func (fp FullPath) DirAndName() (string, string) { - dir, name := filepath.Split(string(fp)) - if dir == "/" { - return dir, name - } - if len(dir) < 1 { - return "/", "" - } - return dir[:len(dir)-1], name -} - -func (fp FullPath) Name() string { - _, name := filepath.Split(string(fp)) - return name -} - -func (fp FullPath) Child(name string) FullPath { - dir := string(fp) - if strings.HasSuffix(dir, "/") { - return FullPath(dir + name) - } - return FullPath(dir + "/" + name) -} diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go deleted file mode 100644 index 4952b3b3a..000000000 --- a/weed/filer2/leveldb/leveldb_store.go +++ /dev/null @@ -1,217 +0,0 @@ -package leveldb - -import ( - "bytes" - "context" - "fmt" - - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" - leveldb_util "github.com/syndtr/goleveldb/leveldb/util" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" -) - -const ( - DIR_FILE_SEPARATOR = byte(0x00) -) - -func init() { - filer2.Stores = append(filer2.Stores, &LevelDBStore{}) -} - -type LevelDBStore struct { - db *leveldb.DB -} - -func (store *LevelDBStore) GetName() string { - return "leveldb" -} - -func (store *LevelDBStore) Initialize(configuration weed_util.Configuration) (err error) { - dir := configuration.GetString("dir") - return store.initialize(dir) -} - -func (store *LevelDBStore) initialize(dir string) (err error) { - glog.Infof("filer store dir: %s", dir) - if err := weed_util.TestFolderWritable(dir); err != nil { - return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) - } - - opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 10, - } - - if store.db, err = leveldb.OpenFile(dir, opts); err != nil { - glog.Infof("filer store open dir %s: %v", dir, err) - return - } - return -} - -func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *LevelDBStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - key := genKey(entry.DirAndName()) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - err = store.db.Put(key, value, nil) - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) - - return nil -} - -func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - key := genKey(fullpath.DirAndName()) - - data, err := store.db.Get(key, nil) - - if err == leveldb.ErrNotFound { - return nil, filer2.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) - - return entry, nil -} - -func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - key := genKey(fullpath.DirAndName()) - - err = store.db.Delete(key, nil) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - - batch := new(leveldb.Batch) - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - batch.Delete([]byte(genKey(string(fullpath), fileName))) - } - iter.Release() - - err = store.db.Write(batch, nil) - - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - if fileName == startFileName && !inclusive { - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), - } - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - iter.Release() - - return entries, err -} - -func genKey(dirPath, fileName string) (key []byte) { - key = []byte(dirPath) - key = append(key, DIR_FILE_SEPARATOR) - key = append(key, []byte(fileName)...) - return key -} - -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { - keyPrefix = []byte(string(fullpath)) - keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix -} - -func getNameFromKey(key []byte) string { - - sepIndex := len(key) - 1 - for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR { - sepIndex-- - } - - return string(key[sepIndex+1:]) - -} diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go deleted file mode 100644 index 904de8c97..000000000 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package leveldb - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer2" - "io/ioutil" - "os" - "testing" -) - -func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) - store := &LevelDBStore{} - store.initialize(dir) - filer.SetStore(store) - filer.DisableDirectoryCache() - - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") - - ctx := context.Background() - - entry1 := &filer2.Entry{ - FullPath: fullpath, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - if err := filer.CreateEntry(ctx, entry1); err != nil { - t.Errorf("create entry %v: %v", entry1.FullPath, err) - return - } - - entry, err := filer.FindEntry(ctx, fullpath) - - if err != nil { - t.Errorf("find entry: %v", err) - return - } - - if entry.FullPath != entry1.FullPath { - t.Errorf("find wrong entry: %v", entry.FullPath) - return - } - - // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} - -func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) - store := &LevelDBStore{} - store.initialize(dir) - filer.SetStore(store) - filer.DisableDirectoryCache() - - ctx := context.Background() - - // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) - if err != nil { - t.Errorf("list entries: %v", err) - return - } - if len(entries) != 0 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go deleted file mode 100644 index 8a16822ab..000000000 --- a/weed/filer2/leveldb2/leveldb2_store.go +++ /dev/null @@ -1,237 +0,0 @@ -package leveldb - -import ( - "bytes" - "context" - "crypto/md5" - "fmt" - "io" - "os" - - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" - leveldb_util "github.com/syndtr/goleveldb/leveldb/util" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" -) - -func init() { - filer2.Stores = append(filer2.Stores, &LevelDB2Store{}) -} - -type LevelDB2Store struct { - dbs []*leveldb.DB - dbCount int -} - -func (store *LevelDB2Store) GetName() string { - return "leveldb2" -} - -func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) { - dir := configuration.GetString("dir") - return store.initialize(dir, 8) -} - -func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { - glog.Infof("filer store leveldb2 dir: %s", dir) - if err := weed_util.TestFolderWritable(dir); err != nil { - return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) - } - - opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - } - - for d := 0; d < dbCount; d++ { - dbFolder := fmt.Sprintf("%s/%02d", dir, d) - os.MkdirAll(dbFolder, 0755) - db, dbErr := leveldb.OpenFile(dbFolder, opts) - if dbErr != nil { - glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr) - return - } - store.dbs = append(store.dbs, db) - } - store.dbCount = dbCount - - return -} - -func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - dir, name := entry.DirAndName() - key, partitionId := genKey(dir, name, store.dbCount) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - err = store.dbs[partitionId].Put(key, value, nil) - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) - - return nil -} - -func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - dir, name := fullpath.DirAndName() - key, partitionId := genKey(dir, name, store.dbCount) - - data, err := store.dbs[partitionId].Get(key, nil) - - if err == leveldb.ErrNotFound { - return nil, filer2.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) - - return entry, nil -} - -func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - dir, name := fullpath.DirAndName() - key, partitionId := genKey(dir, name, store.dbCount) - - err = store.dbs[partitionId].Delete(key, nil) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) - - batch := new(leveldb.Batch) - - iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - batch.Delete(append(directoryPrefix, []byte(fileName)...)) - } - iter.Release() - - err = store.dbs[partitionId].Write(batch, nil) - - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) - lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount) - - iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil) - for iter.Next() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - continue - } - if fileName == startFileName && !inclusive { - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), - } - - // println("list", entry.FullPath, "chunks", len(entry.Chunks)) - - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - } - iter.Release() - - return entries, err -} - -func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) { - key, partitionId = hashToBytes(dirPath, dbCount) - key = append(key, []byte(fileName)...) - return key, partitionId -} - -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { - keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix, partitionId -} - -func getNameFromKey(key []byte) string { - - return string(key[md5.Size:]) - -} - -// hash directory, and use last byte for partitioning -func hashToBytes(dir string, dbCount int) ([]byte, int) { - h := md5.New() - io.WriteString(h, dir) - - b := h.Sum(nil) - - x := b[len(b)-1] - - return b, int(x) % dbCount -} diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go deleted file mode 100644 index e28ef7dac..000000000 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package leveldb - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer2" - "io/ioutil" - "os" - "testing" -) - -func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) - store := &LevelDB2Store{} - store.initialize(dir, 2) - filer.SetStore(store) - filer.DisableDirectoryCache() - - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") - - ctx := context.Background() - - entry1 := &filer2.Entry{ - FullPath: fullpath, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - if err := filer.CreateEntry(ctx, entry1); err != nil { - t.Errorf("create entry %v: %v", entry1.FullPath, err) - return - } - - entry, err := filer.FindEntry(ctx, fullpath) - - if err != nil { - t.Errorf("find entry: %v", err) - return - } - - if entry.FullPath != entry1.FullPath { - t.Errorf("find wrong entry: %v", entry.FullPath) - return - } - - // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} - -func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) - store := &LevelDB2Store{} - store.initialize(dir, 2) - filer.SetStore(store) - filer.DisableDirectoryCache() - - ctx := context.Background() - - // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) - if err != nil { - t.Errorf("list entries: %v", err) - return - } - if len(entries) != 0 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go deleted file mode 100644 index d1b06ece5..000000000 --- a/weed/filer2/mysql/mysql_store.go +++ /dev/null @@ -1,74 +0,0 @@ -package mysql - -import ( - "database/sql" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql" - "github.com/chrislusf/seaweedfs/weed/util" - _ "github.com/go-sql-driver/mysql" -) - -const ( - CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8" -) - -func init() { - filer2.Stores = append(filer2.Stores, &MysqlStore{}) -} - -type MysqlStore struct { - abstract_sql.AbstractSqlStore -} - -func (store *MysqlStore) GetName() string { - return "mysql" -} - -func (store *MysqlStore) Initialize(configuration util.Configuration) (err error) { - return store.initialize( - configuration.GetString("username"), - configuration.GetString("password"), - configuration.GetString("hostname"), - configuration.GetInt("port"), - configuration.GetString("database"), - configuration.GetInt("connection_max_idle"), - configuration.GetInt("connection_max_open"), - configuration.GetBool("interpolateParams"), - ) -} - -func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int, - interpolateParams bool) (err error) { - - store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" - store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" - store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" - store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" - store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?" - - sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) - if interpolateParams { - sqlUrl += "&interpolateParams=true" - } - - var dbErr error - store.DB, dbErr = sql.Open("mysql", sqlUrl) - if dbErr != nil { - store.DB.Close() - store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) - } - - store.DB.SetMaxIdleConns(maxIdle) - store.DB.SetMaxOpenConns(maxOpen) - - if err = store.DB.Ping(); err != nil { - return fmt.Errorf("connect to %s error:%v", sqlUrl, err) - } - - return nil -} diff --git a/weed/filer2/permission.go b/weed/filer2/permission.go deleted file mode 100644 index 8a9508fbc..000000000 --- a/weed/filer2/permission.go +++ /dev/null @@ -1,22 +0,0 @@ -package filer2 - -func hasWritePermission(dir *Entry, entry *Entry) bool { - - if dir == nil { - return false - } - - if dir.Uid == entry.Uid && dir.Mode&0200 > 0 { - return true - } - - if dir.Gid == entry.Gid && dir.Mode&0020 > 0 { - return true - } - - if dir.Mode&0002 > 0 { - return true - } - - return false -} diff --git a/weed/filer2/postgres/README.txt b/weed/filer2/postgres/README.txt deleted file mode 100644 index cb0c99c63..000000000 --- a/weed/filer2/postgres/README.txt +++ /dev/null @@ -1,17 +0,0 @@ - -1. create "seaweedfs" database - -export PGHOME=/Library/PostgreSQL/10 -$PGHOME/bin/createdb --username=postgres --password seaweedfs - -2. create "filemeta" table -$PGHOME/bin/psql --username=postgres --password seaweedfs - -CREATE TABLE IF NOT EXISTS filemeta ( - dirhash BIGINT, - name VARCHAR(65535), - directory VARCHAR(65535), - meta bytea, - PRIMARY KEY (dirhash, name) -); - diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go deleted file mode 100644 index 3ec000fe0..000000000 --- a/weed/filer2/postgres/postgres_store.go +++ /dev/null @@ -1,69 +0,0 @@ -package postgres - -import ( - "database/sql" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql" - "github.com/chrislusf/seaweedfs/weed/util" - _ "github.com/lib/pq" -) - -const ( - CONNECTION_URL_PATTERN = "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30" -) - -func init() { - filer2.Stores = append(filer2.Stores, &PostgresStore{}) -} - -type PostgresStore struct { - abstract_sql.AbstractSqlStore -} - -func (store *PostgresStore) GetName() string { - return "postgres" -} - -func (store *PostgresStore) Initialize(configuration util.Configuration) (err error) { - return store.initialize( - configuration.GetString("username"), - configuration.GetString("password"), - configuration.GetString("hostname"), - configuration.GetInt("port"), - configuration.GetString("database"), - configuration.GetString("sslmode"), - configuration.GetInt("connection_max_idle"), - configuration.GetInt("connection_max_open"), - ) -} - -func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) { - - store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)" - store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" - store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" - store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" - store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" - - sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, password, database, sslmode) - var dbErr error - store.DB, dbErr = sql.Open("postgres", sqlUrl) - if dbErr != nil { - store.DB.Close() - store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) - } - - store.DB.SetMaxIdleConns(maxIdle) - store.DB.SetMaxOpenConns(maxOpen) - - if err = store.DB.Ping(); err != nil { - return fmt.Errorf("connect to %s error:%v", sqlUrl, err) - } - - return nil -} diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go deleted file mode 100644 index f1ad4b35c..000000000 --- a/weed/filer2/redis/redis_cluster_store.go +++ /dev/null @@ -1,42 +0,0 @@ -package redis - -import ( - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" -) - -func init() { - filer2.Stores = append(filer2.Stores, &RedisClusterStore{}) -} - -type RedisClusterStore struct { - UniversalRedisStore -} - -func (store *RedisClusterStore) GetName() string { - return "redis_cluster" -} - -func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) { - - configuration.SetDefault("useReadOnly", true) - configuration.SetDefault("routeByLatency", true) - - return store.initialize( - configuration.GetStringSlice("addresses"), - configuration.GetString("password"), - configuration.GetBool("useReadOnly"), - configuration.GetBool("routeByLatency"), - ) -} - -func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { - store.Client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: addresses, - Password: password, - ReadOnly: readOnly, - RouteByLatency: routeByLatency, - }) - return -} diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go deleted file mode 100644 index c56fa014c..000000000 --- a/weed/filer2/redis/redis_store.go +++ /dev/null @@ -1,36 +0,0 @@ -package redis - -import ( - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" -) - -func init() { - filer2.Stores = append(filer2.Stores, &RedisStore{}) -} - -type RedisStore struct { - UniversalRedisStore -} - -func (store *RedisStore) GetName() string { - return "redis" -} - -func (store *RedisStore) Initialize(configuration util.Configuration) (err error) { - return store.initialize( - configuration.GetString("address"), - configuration.GetString("password"), - configuration.GetInt("database"), - ) -} - -func (store *RedisStore) initialize(hostPort string, password string, database int) (err error) { - store.Client = redis.NewClient(&redis.Options{ - Addr: hostPort, - Password: password, - DB: database, - }) - return -} diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go deleted file mode 100644 index 62257e91e..000000000 --- a/weed/filer2/redis/universal_redis_store.go +++ /dev/null @@ -1,171 +0,0 @@ -package redis - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/go-redis/redis" - "sort" - "strings" - "time" -) - -const ( - DIR_LIST_MARKER = "\x00" -) - -type UniversalRedisStore struct { - Client redis.UniversalClient -} - -func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result() - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - dir, name := entry.FullPath.DirAndName() - if name != "" { - _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result() - if err != nil { - return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) - } - } - - return nil -} - -func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - - data, err := store.Client.Get(string(fullpath)).Result() - if err == redis.Nil { - return nil, filer2.ErrNotFound - } - - if err != nil { - return nil, fmt.Errorf("get %s : %v", fullpath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks([]byte(data)) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil -} - -func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - - _, err = store.Client.Del(string(fullpath)).Result() - - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - dir, name := fullpath.DirAndName() - if name != "" { - _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result() - if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) - } - } - - return nil -} - -func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - - members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() - if err != nil { - return fmt.Errorf("delete folder %s : %v", fullpath, err) - } - - for _, fileName := range members { - path := filer2.NewFullPath(string(fullpath), fileName) - _, err = store.Client.Del(string(path)).Result() - if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) - } - } - - return nil -} - -func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() - if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) - } - - // skip - if startFileName != "" { - var t []string - for _, m := range members { - if strings.Compare(m, startFileName) >= 0 { - if m == startFileName { - if inclusive { - t = append(t, m) - } - } else { - t = append(t, m) - } - } - } - members = t - } - - // sort - sort.Slice(members, func(i, j int) bool { - return strings.Compare(members[i], members[j]) < 0 - }) - - // limit - if limit < len(members) { - members = members[:limit] - } - - // fetch entry meta - for _, fileName := range members { - path := filer2.NewFullPath(string(fullpath), fileName) - entry, err := store.FindEntry(ctx, path) - if err != nil { - glog.V(0).Infof("list %s : %v", path, err) - } else { - entries = append(entries, entry) - } - } - - return entries, err -} - -func genDirectoryListKey(dir string) (dirList string) { - return dir + DIR_LIST_MARKER -} diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go deleted file mode 100644 index 01b87cad1..000000000 --- a/weed/filer2/stream.go +++ /dev/null @@ -1,41 +0,0 @@ -package filer2 - -import ( - "io" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/wdclient" -) - -func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error { - - chunkViews := ViewFromChunks(chunks, offset, size) - - fileId2Url := make(map[string]string) - - for _, chunkView := range chunkViews { - - urlString, err := masterClient.LookupFileId(chunkView.FileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err - } - fileId2Url[chunkView.FileId] = urlString - } - - for _, chunkView := range chunkViews { - urlString := fileId2Url[chunkView.FileId] - _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return err - } - } - - return nil - -} diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go deleted file mode 100644 index 4eb8cb90d..000000000 --- a/weed/filer2/tikv/tikv_store.go +++ /dev/null @@ -1,251 +0,0 @@ -// +build !386 -// +build !arm - -package tikv - -import ( - "bytes" - "context" - "crypto/md5" - "fmt" - "io" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" - - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/store/tikv" -) - -func init() { - filer2.Stores = append(filer2.Stores, &TikvStore{}) -} - -type TikvStore struct { - store kv.Storage -} - -func (store *TikvStore) GetName() string { - return "tikv" -} - -func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) { - pdAddr := configuration.GetString("pdAddress") - return store.initialize(pdAddr) -} - -func (store *TikvStore) initialize(pdAddr string) (err error) { - glog.Infof("filer store tikv pd address: %s", pdAddr) - - driver := tikv.Driver{} - - store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr)) - - if err != nil { - return fmt.Errorf("open tikv %s : %v", pdAddr, err) - } - - return -} - -func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { - tx, err := store.store.Begin() - if err != nil { - return ctx, err - } - return context.WithValue(ctx, "tx", tx), nil -} -func (store *TikvStore) CommitTransaction(ctx context.Context) error { - tx, ok := ctx.Value("tx").(kv.Transaction) - if ok { - return tx.Commit(ctx) - } - return nil -} -func (store *TikvStore) RollbackTransaction(ctx context.Context) error { - tx, ok := ctx.Value("tx").(kv.Transaction) - if ok { - return tx.Rollback() - } - return nil -} - -func (store *TikvStore) getTx(ctx context.Context) kv.Transaction { - if tx, ok := ctx.Value("tx").(kv.Transaction); ok { - return tx - } - return nil -} - -func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - dir, name := entry.DirAndName() - key := genKey(dir, name) - - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - err = store.getTx(ctx).Set(key, value) - - if err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) - } - - // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) - - return nil -} - -func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - - return store.InsertEntry(ctx, entry) -} - -func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - dir, name := fullpath.DirAndName() - key := genKey(dir, name) - - data, err := store.getTx(ctx).Get(ctx, key) - - if err == kv.ErrNotExist { - return nil, filer2.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) - } - - entry = &filer2.Entry{ - FullPath: fullpath, - } - err = entry.DecodeAttributesAndChunks(data) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) - - return entry, nil -} - -func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - dir, name := fullpath.DirAndName() - key := genKey(dir, name) - - err = store.getTx(ctx).Delete(key) - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - return nil -} - -func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - - tx := store.getTx(ctx) - - iter, err := tx.Iter(directoryPrefix, nil) - if err != nil { - return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err) - } - defer iter.Close() - for iter.Valid() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - iter.Next() - continue - } - - if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) - } - - iter.Next() - } - - return nil -} - -func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - - directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName) - - iter, err := store.getTx(ctx).Iter(lastFileStart, nil) - if err != nil { - return nil, fmt.Errorf("list %s: %v", fullpath, err) - } - defer iter.Close() - for iter.Valid() { - key := iter.Key() - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - fileName := getNameFromKey(key) - if fileName == "" { - iter.Next() - continue - } - if fileName == startFileName && !inclusive { - iter.Next() - continue - } - limit-- - if limit < 0 { - break - } - entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), - } - - // println("list", entry.FullPath, "chunks", len(entry.Chunks)) - - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - entries = append(entries, entry) - iter.Next() - } - - return entries, err -} - -func genKey(dirPath, fileName string) (key []byte) { - key = hashToBytes(dirPath) - key = append(key, []byte(fileName)...) - return key -} - -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { - keyPrefix = hashToBytes(string(fullpath)) - if len(startFileName) > 0 { - keyPrefix = append(keyPrefix, []byte(startFileName)...) - } - return keyPrefix -} - -func getNameFromKey(key []byte) string { - - return string(key[md5.Size:]) - -} - -// hash directory -func hashToBytes(dir string) []byte { - h := md5.New() - io.WriteString(h, dir) - - b := h.Sum(nil) - - return b -} diff --git a/weed/filer2/tikv/tikv_store_unsupported.go b/weed/filer2/tikv/tikv_store_unsupported.go deleted file mode 100644 index 36de2d974..000000000 --- a/weed/filer2/tikv/tikv_store_unsupported.go +++ /dev/null @@ -1,65 +0,0 @@ -// +build 386 arm - -package tikv - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer2" - weed_util "github.com/chrislusf/seaweedfs/weed/util" -) - -func init() { - filer2.Stores = append(filer2.Stores, &TikvStore{}) -} - -type TikvStore struct { -} - -func (store *TikvStore) GetName() string { - return "tikv" -} - -func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) initialize(pdAddr string) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return nil, fmt.Errorf("not implemented for 32 bit computers") -} -func (store *TikvStore) CommitTransaction(ctx context.Context) error { - return fmt.Errorf("not implemented for 32 bit computers") -} -func (store *TikvStore) RollbackTransaction(ctx context.Context) error { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - return nil, fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { - return fmt.Errorf("not implemented for 32 bit computers") -} - -func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer2.Entry, err error) { - return nil, fmt.Errorf("not implemented for 32 bit computers") -} |
