aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go184
-rw-r--r--weed/filer2/abstract_sql/hashing.go32
-rw-r--r--weed/filer2/cassandra/README.txt14
-rw-r--r--weed/filer2/cassandra/cassandra_store.go153
-rw-r--r--weed/filer2/configuration.go51
-rw-r--r--weed/filer2/entry.go73
-rw-r--r--weed/filer2/entry_codec.go116
-rw-r--r--weed/filer2/etcd/etcd_store.go196
-rw-r--r--weed/filer2/filechunks.go228
-rw-r--r--weed/filer2/filechunks_test.go384
-rw-r--r--weed/filer2/filer.go253
-rw-r--r--weed/filer2/filer_client_util.go172
-rw-r--r--weed/filer2/filer_delete_entry.go102
-rw-r--r--weed/filer2/filer_deletion.go87
-rw-r--r--weed/filer2/filer_notify.go39
-rw-r--r--weed/filer2/filer_notify_test.go51
-rw-r--r--weed/filer2/filerstore.go138
-rw-r--r--weed/filer2/fullpath.go36
-rw-r--r--weed/filer2/leveldb/leveldb_store.go217
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go88
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go237
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go88
-rw-r--r--weed/filer2/mysql/mysql_store.go74
-rw-r--r--weed/filer2/permission.go22
-rw-r--r--weed/filer2/postgres/README.txt17
-rw-r--r--weed/filer2/postgres/postgres_store.go69
-rw-r--r--weed/filer2/redis/redis_cluster_store.go42
-rw-r--r--weed/filer2/redis/redis_store.go36
-rw-r--r--weed/filer2/redis/universal_redis_store.go171
-rw-r--r--weed/filer2/stream.go41
-rw-r--r--weed/filer2/tikv/tikv_store.go251
-rw-r--r--weed/filer2/tikv/tikv_store_unsupported.go65
32 files changed, 0 insertions, 3727 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
deleted file mode 100644
index d512467c7..000000000
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ /dev/null
@@ -1,184 +0,0 @@
-package abstract_sql
-
-import (
- "context"
- "database/sql"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
-)
-
-type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlDeleteFolderChildren string
- SqlListExclusive string
- SqlListInclusive string
-}
-
-type TxOrDB interface {
- ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
- QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
- QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
-}
-
-func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
- Isolation: sql.LevelReadCommitted,
- ReadOnly: false,
- })
- if err != nil {
- return ctx, err
- }
-
- return context.WithValue(ctx, "tx", tx), nil
-}
-func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
- if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx.Commit()
- }
- return nil
-}
-func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
- if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx.Rollback()
- }
- return nil
-}
-
-func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
- if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx
- }
- return store.DB
-}
-
-func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- dir, name := entry.FullPath.DirAndName()
- meta, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encode %s: %s", entry.FullPath, err)
- }
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta)
- if err != nil {
- return fmt.Errorf("insert %s: %s", entry.FullPath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("insert %s but no rows affected: %s", entry.FullPath, err)
- }
- return nil
-}
-
-func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- dir, name := entry.FullPath.DirAndName()
- meta, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encode %s: %s", entry.FullPath, err)
- }
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir)
- if err != nil {
- return fmt.Errorf("update %s: %s", entry.FullPath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
- }
- return nil
-}
-
-func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
-
- dir, name := fullpath.DirAndName()
- row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir)
- var data []byte
- if err := row.Scan(&data); err != nil {
- return nil, filer2.ErrNotFound
- }
-
- entry := &filer2.Entry{
- FullPath: fullpath,
- }
- if err := entry.DecodeAttributesAndChunks(data); err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
-
- dir, name := fullpath.DirAndName()
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir)
- if err != nil {
- return fmt.Errorf("delete %s: %s", fullpath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
- }
-
- return nil
-}
-
-func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, hashToLong(string(fullpath)), fullpath)
- if err != nil {
- return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
- }
-
- return nil
-}
-
-func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
-
- sqlText := store.SqlListExclusive
- if inclusive {
- sqlText = store.SqlListInclusive
- }
-
- rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
- if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
- }
- defer rows.Close()
-
- for rows.Next() {
- var name string
- var data []byte
- if err = rows.Scan(&name, &data); err != nil {
- glog.V(0).Infof("scan %s : %v", fullpath, err)
- return nil, fmt.Errorf("scan %s: %v", fullpath, err)
- }
-
- entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), name),
- }
- if err = entry.DecodeAttributesAndChunks(data); err != nil {
- glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
- return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
- }
-
- entries = append(entries, entry)
- }
-
- return entries, nil
-}
diff --git a/weed/filer2/abstract_sql/hashing.go b/weed/filer2/abstract_sql/hashing.go
deleted file mode 100644
index 5c982c537..000000000
--- a/weed/filer2/abstract_sql/hashing.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package abstract_sql
-
-import (
- "crypto/md5"
- "io"
-)
-
-// returns a 64 bit big int
-func hashToLong(dir string) (v int64) {
- h := md5.New()
- io.WriteString(h, dir)
-
- b := h.Sum(nil)
-
- v += int64(b[0])
- v <<= 8
- v += int64(b[1])
- v <<= 8
- v += int64(b[2])
- v <<= 8
- v += int64(b[3])
- v <<= 8
- v += int64(b[4])
- v <<= 8
- v += int64(b[5])
- v <<= 8
- v += int64(b[6])
- v <<= 8
- v += int64(b[7])
-
- return
-}
diff --git a/weed/filer2/cassandra/README.txt b/weed/filer2/cassandra/README.txt
deleted file mode 100644
index 122c9c3f4..000000000
--- a/weed/filer2/cassandra/README.txt
+++ /dev/null
@@ -1,14 +0,0 @@
-1. create a keyspace
-
-CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
-
-2. create filemeta table
-
- USE seaweedfs;
-
- CREATE TABLE filemeta (
- directory varchar,
- name varchar,
- meta blob,
- PRIMARY KEY (directory, name)
- ) WITH CLUSTERING ORDER BY (name ASC);
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
deleted file mode 100644
index dcaab8bc4..000000000
--- a/weed/filer2/cassandra/cassandra_store.go
+++ /dev/null
@@ -1,153 +0,0 @@
-package cassandra
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gocql/gocql"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &CassandraStore{})
-}
-
-type CassandraStore struct {
- cluster *gocql.ClusterConfig
- session *gocql.Session
-}
-
-func (store *CassandraStore) GetName() string {
- return "cassandra"
-}
-
-func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) {
- return store.initialize(
- configuration.GetString("keyspace"),
- configuration.GetStringSlice("hosts"),
- )
-}
-
-func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
- store.cluster = gocql.NewCluster(hosts...)
- store.cluster.Keyspace = keyspace
- store.cluster.Consistency = gocql.LocalQuorum
- store.session, err = store.cluster.CreateSession()
- if err != nil {
- glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
- }
- return
-}
-
-func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- dir, name := entry.FullPath.DirAndName()
- meta, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encode %s: %s", entry.FullPath, err)
- }
-
- if err := store.session.Query(
- "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
- dir, name, meta, entry.TtlSec).Exec(); err != nil {
- return fmt.Errorf("insert %s: %s", entry.FullPath, err)
- }
-
- return nil
-}
-
-func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
-
- dir, name := fullpath.DirAndName()
- var data []byte
- if err := store.session.Query(
- "SELECT meta FROM filemeta WHERE directory=? AND name=?",
- dir, name).Consistency(gocql.One).Scan(&data); err != nil {
- if err != gocql.ErrNotFound {
- return nil, filer2.ErrNotFound
- }
- }
-
- if len(data) == 0 {
- return nil, filer2.ErrNotFound
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
-
- dir, name := fullpath.DirAndName()
-
- if err := store.session.Query(
- "DELETE FROM filemeta WHERE directory=? AND name=?",
- dir, name).Exec(); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
-
- if err := store.session.Query(
- "DELETE FROM filemeta WHERE directory=?",
- fullpath).Exec(); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
- if inclusive {
- cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
- }
-
- var data []byte
- var name string
- iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
- for iter.Scan(&name, &data) {
- entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), name),
- }
- if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
- if err := iter.Close(); err != nil {
- glog.V(0).Infof("list iterator close: %v", err)
- }
-
- return entries, err
-}
diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go
deleted file mode 100644
index 7b05b53dc..000000000
--- a/weed/filer2/configuration.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package filer2
-
-import (
- "os"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/spf13/viper"
-)
-
-var (
- Stores []FilerStore
-)
-
-func (f *Filer) LoadConfiguration(config *viper.Viper) {
-
- validateOneEnabledStore(config)
-
- for _, store := range Stores {
- if config.GetBool(store.GetName() + ".enabled") {
- viperSub := config.Sub(store.GetName())
- if err := store.Initialize(viperSub); err != nil {
- glog.Fatalf("Failed to initialize store for %s: %+v",
- store.GetName(), err)
- }
- f.SetStore(store)
- glog.V(0).Infof("Configure filer for %s", store.GetName())
- return
- }
- }
-
- println()
- println("Supported filer stores are:")
- for _, store := range Stores {
- println(" " + store.GetName())
- }
-
- os.Exit(-1)
-}
-
-func validateOneEnabledStore(config *viper.Viper) {
- enabledStore := ""
- for _, store := range Stores {
- if config.GetBool(store.GetName() + ".enabled") {
- if enabledStore == "" {
- enabledStore = store.GetName()
- } else {
- glog.Fatalf("Filer store is enabled for both %s and %s", enabledStore, store.GetName())
- }
- }
- }
-}
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
deleted file mode 100644
index c901927bb..000000000
--- a/weed/filer2/entry.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package filer2
-
-import (
- "os"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-type Attr struct {
- Mtime time.Time // time of last modification
- Crtime time.Time // time of creation (OS X only)
- Mode os.FileMode // file mode
- Uid uint32 // owner uid
- Gid uint32 // group gid
- Mime string // mime type
- Replication string // replication
- Collection string // collection name
- TtlSec int32 // ttl in seconds
- UserName string
- GroupNames []string
- SymlinkTarget string
-}
-
-func (attr Attr) IsDirectory() bool {
- return attr.Mode&os.ModeDir > 0
-}
-
-type Entry struct {
- FullPath
-
- Attr
- Extended map[string][]byte
-
- // the following is for files
- Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`
-}
-
-func (entry *Entry) Size() uint64 {
- return TotalSize(entry.Chunks)
-}
-
-func (entry *Entry) Timestamp() time.Time {
- if entry.IsDirectory() {
- return entry.Crtime
- } else {
- return entry.Mtime
- }
-}
-
-func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
- if entry == nil {
- return nil
- }
- return &filer_pb.Entry{
- Name: entry.FullPath.Name(),
- IsDirectory: entry.IsDirectory(),
- Attributes: EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- }
-}
-
-func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
- if entry == nil {
- return nil
- }
- dir, _ := entry.FullPath.DirAndName()
- return &filer_pb.FullEntry{
- Dir: dir,
- Entry: entry.ToProtoEntry(),
- }
-}
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
deleted file mode 100644
index 3a2dc6134..000000000
--- a/weed/filer2/entry_codec.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package filer2
-
-import (
- "bytes"
- "fmt"
- "os"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
- message := &filer_pb.Entry{
- Attributes: EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- }
- return proto.Marshal(message)
-}
-
-func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
-
- message := &filer_pb.Entry{}
-
- if err := proto.UnmarshalMerge(blob, message); err != nil {
- return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err)
- }
-
- entry.Attr = PbToEntryAttribute(message.Attributes)
-
- entry.Extended = message.Extended
-
- entry.Chunks = message.Chunks
-
- return nil
-}
-
-func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
-
- return &filer_pb.FuseAttributes{
- Crtime: entry.Attr.Crtime.Unix(),
- Mtime: entry.Attr.Mtime.Unix(),
- FileMode: uint32(entry.Attr.Mode),
- Uid: entry.Uid,
- Gid: entry.Gid,
- Mime: entry.Mime,
- Collection: entry.Attr.Collection,
- Replication: entry.Attr.Replication,
- TtlSec: entry.Attr.TtlSec,
- UserName: entry.Attr.UserName,
- GroupName: entry.Attr.GroupNames,
- SymlinkTarget: entry.Attr.SymlinkTarget,
- }
-}
-
-func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
-
- t := Attr{}
-
- t.Crtime = time.Unix(attr.Crtime, 0)
- t.Mtime = time.Unix(attr.Mtime, 0)
- t.Mode = os.FileMode(attr.FileMode)
- t.Uid = attr.Uid
- t.Gid = attr.Gid
- t.Mime = attr.Mime
- t.Collection = attr.Collection
- t.Replication = attr.Replication
- t.TtlSec = attr.TtlSec
- t.UserName = attr.UserName
- t.GroupNames = attr.GroupName
- t.SymlinkTarget = attr.SymlinkTarget
-
- return t
-}
-
-func EqualEntry(a, b *Entry) bool {
- if a == b {
- return true
- }
- if a == nil && b != nil || a != nil && b == nil {
- return false
- }
- if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) {
- return false
- }
- if len(a.Chunks) != len(b.Chunks) {
- return false
- }
-
- if !eq(a.Extended, b.Extended) {
- return false
- }
-
- for i := 0; i < len(a.Chunks); i++ {
- if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
- return false
- }
- }
- return true
-}
-
-func eq(a, b map[string][]byte) bool {
- if len(a) != len(b) {
- return false
- }
-
- for k, v := range a {
- if w, ok := b[k]; !ok || !bytes.Equal(v, w) {
- return false
- }
- }
-
- return true
-}
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
deleted file mode 100644
index 2eb9e3e86..000000000
--- a/weed/filer2/etcd/etcd_store.go
+++ /dev/null
@@ -1,196 +0,0 @@
-package etcd
-
-import (
- "context"
- "fmt"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
- "go.etcd.io/etcd/clientv3"
-)
-
-const (
- DIR_FILE_SEPARATOR = byte(0x00)
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &EtcdStore{})
-}
-
-type EtcdStore struct {
- client *clientv3.Client
-}
-
-func (store *EtcdStore) GetName() string {
- return "etcd"
-}
-
-func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) {
- servers := configuration.GetString("servers")
- if servers == "" {
- servers = "localhost:2379"
- }
-
- timeout := configuration.GetString("timeout")
- if timeout == "" {
- timeout = "3s"
- }
-
- return store.initialize(servers, timeout)
-}
-
-func (store *EtcdStore) initialize(servers string, timeout string) (err error) {
- glog.Infof("filer store etcd: %s", servers)
-
- to, err := time.ParseDuration(timeout)
- if err != nil {
- return fmt.Errorf("parse timeout %s: %s", timeout, err)
- }
-
- store.client, err = clientv3.New(clientv3.Config{
- Endpoints: strings.Split(servers, ","),
- DialTimeout: to,
- })
- if err != nil {
- return fmt.Errorf("connect to etcd %s: %s", servers, err)
- }
-
- return
-}
-
-func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *EtcdStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- key := genKey(entry.DirAndName())
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- return nil
-}
-
-func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- key := genKey(fullpath.DirAndName())
-
- resp, err := store.client.Get(ctx, string(key))
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- if len(resp.Kvs) == 0 {
- return nil, filer2.ErrNotFound
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- key := genKey(fullpath.DirAndName())
-
- if _, err := store.client.Delete(ctx, string(key)); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
- return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *EtcdStore) ListDirectoryEntries(
- ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
-) (entries []*filer2.Entry, err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- resp, err := store.client.Get(ctx, string(directoryPrefix),
- clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
- if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
- }
-
- for _, kv := range resp.Kvs {
- fileName := getNameFromKey(kv.Key)
- if fileName == "" {
- continue
- }
- if fileName == startFileName && !inclusive {
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
- }
- if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string) (key []byte) {
- key = []byte(dirPath)
- key = append(key, DIR_FILE_SEPARATOR)
- key = append(key, []byte(fileName)...)
- return key
-}
-
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
- keyPrefix = []byte(string(fullpath))
- keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix
-}
-
-func getNameFromKey(key []byte) string {
- sepIndex := len(key) - 1
- for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
- sepIndex--
- }
-
- return string(key[sepIndex+1:])
-}
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
deleted file mode 100644
index b5876df82..000000000
--- a/weed/filer2/filechunks.go
+++ /dev/null
@@ -1,228 +0,0 @@
-package filer2
-
-import (
- "fmt"
- "hash/fnv"
- "sort"
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
- for _, c := range chunks {
- t := uint64(c.Offset + int64(c.Size))
- if size < t {
- size = t
- }
- }
- return
-}
-
-func ETag(chunks []*filer_pb.FileChunk) (etag string) {
- if len(chunks) == 1 {
- return chunks[0].ETag
- }
-
- h := fnv.New32a()
- for _, c := range chunks {
- h.Write([]byte(c.ETag))
- }
- return fmt.Sprintf("%x", h.Sum32())
-}
-
-func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
-
- visibles := NonOverlappingVisibleIntervals(chunks)
-
- fileIds := make(map[string]bool)
- for _, interval := range visibles {
- fileIds[interval.fileId] = true
- }
- for _, chunk := range chunks {
- if _, found := fileIds[chunk.GetFileIdString()]; found {
- compacted = append(compacted, chunk)
- } else {
- garbage = append(garbage, chunk)
- }
- }
-
- return
-}
-
-func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
-
- fileIds := make(map[string]bool)
- for _, interval := range bs {
- fileIds[interval.GetFileIdString()] = true
- }
- for _, chunk := range as {
- if _, found := fileIds[chunk.GetFileIdString()]; !found {
- delta = append(delta, chunk)
- }
- }
-
- return
-}
-
-type ChunkView struct {
- FileId string
- Offset int64
- Size uint64
- LogicOffset int64
- IsFullChunk bool
-}
-
-func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) {
-
- visibles := NonOverlappingVisibleIntervals(chunks)
-
- return ViewFromVisibleIntervals(visibles, offset, size)
-
-}
-
-func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int) (views []*ChunkView) {
-
- stop := offset + int64(size)
-
- for _, chunk := range visibles {
- if chunk.start <= offset && offset < chunk.stop && offset < stop {
- isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
- views = append(views, &ChunkView{
- FileId: chunk.fileId,
- Offset: offset - chunk.start, // offset is the data starting location in this file id
- Size: uint64(min(chunk.stop, stop) - offset),
- LogicOffset: offset,
- IsFullChunk: isFullChunk,
- })
- offset = min(chunk.stop, stop)
- }
- }
-
- return views
-
-}
-
-func logPrintf(name string, visibles []VisibleInterval) {
- /*
- log.Printf("%s len %d", name, len(visibles))
- for _, v := range visibles {
- log.Printf("%s: => %+v", name, v)
- }
- */
-}
-
-var bufPool = sync.Pool{
- New: func() interface{} {
- return new(VisibleInterval)
- },
-}
-
-func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
-
- newV := newVisibleInterval(
- chunk.Offset,
- chunk.Offset+int64(chunk.Size),
- chunk.GetFileIdString(),
- chunk.Mtime,
- true,
- )
-
- length := len(visibles)
- if length == 0 {
- return append(visibles, newV)
- }
- last := visibles[length-1]
- if last.stop <= chunk.Offset {
- return append(visibles, newV)
- }
-
- logPrintf(" before", visibles)
- for _, v := range visibles {
- if v.start < chunk.Offset && chunk.Offset < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(
- v.start,
- chunk.Offset,
- v.fileId,
- v.modifiedTime,
- false,
- ))
- }
- chunkStop := chunk.Offset + int64(chunk.Size)
- if v.start < chunkStop && chunkStop < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(
- chunkStop,
- v.stop,
- v.fileId,
- v.modifiedTime,
- false,
- ))
- }
- if chunkStop <= v.start || v.stop <= chunk.Offset {
- newVisibles = append(newVisibles, v)
- }
- }
- newVisibles = append(newVisibles, newV)
-
- logPrintf(" append", newVisibles)
-
- for i := len(newVisibles) - 1; i >= 0; i-- {
- if i > 0 && newV.start < newVisibles[i-1].start {
- newVisibles[i] = newVisibles[i-1]
- } else {
- newVisibles[i] = newV
- break
- }
- }
- logPrintf(" sorted", newVisibles)
-
- return newVisibles
-}
-
-func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
-
- sort.Slice(chunks, func(i, j int) bool {
- return chunks[i].Mtime < chunks[j].Mtime
- })
-
- var newVisibles []VisibleInterval
- for _, chunk := range chunks {
- newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk)
- t := visibles[:0]
- visibles = newVisibles
- newVisibles = t
-
- logPrintf("add", visibles)
-
- }
-
- return
-}
-
-// find non-overlapping visible intervals
-// visible interval map to one file chunk
-
-type VisibleInterval struct {
- start int64
- stop int64
- modifiedTime int64
- fileId string
- isFullChunk bool
-}
-
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) VisibleInterval {
- return VisibleInterval{
- start: start,
- stop: stop,
- fileId: fileId,
- modifiedTime: modifiedTime,
- isFullChunk: isFullChunk,
- }
-}
-
-func min(x, y int64) int64 {
- if x <= y {
- return x
- }
- return y
-}
diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go
deleted file mode 100644
index e75e60753..000000000
--- a/weed/filer2/filechunks_test.go
+++ /dev/null
@@ -1,384 +0,0 @@
-package filer2
-
-import (
- "log"
- "testing"
-
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func TestCompactFileChunks(t *testing.T) {
- chunks := []*filer_pb.FileChunk{
- {Offset: 10, Size: 100, FileId: "abc", Mtime: 50},
- {Offset: 100, Size: 100, FileId: "def", Mtime: 100},
- {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200},
- {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300},
- }
-
- compacted, garbage := CompactFileChunks(chunks)
-
- if len(compacted) != 3 {
- t.Fatalf("unexpected compacted: %d", len(compacted))
- }
- if len(garbage) != 1 {
- t.Fatalf("unexpected garbage: %d", len(garbage))
- }
-
-}
-
-func TestCompactFileChunks2(t *testing.T) {
-
- chunks := []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 50},
- {Offset: 100, Size: 100, FileId: "def", Mtime: 100},
- {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200},
- {Offset: 0, Size: 100, FileId: "abcf", Mtime: 300},
- {Offset: 50, Size: 100, FileId: "fhfh", Mtime: 400},
- {Offset: 100, Size: 100, FileId: "yuyu", Mtime: 500},
- }
-
- k := 3
-
- for n := 0; n < k; n++ {
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n),
- })
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k),
- })
- }
-
- compacted, garbage := CompactFileChunks(chunks)
-
- if len(compacted) != 4 {
- t.Fatalf("unexpected compacted: %d", len(compacted))
- }
- if len(garbage) != 8 {
- t.Fatalf("unexpected garbage: %d", len(garbage))
- }
-}
-
-func TestIntervalMerging(t *testing.T) {
-
- testcases := []struct {
- Chunks []*filer_pb.FileChunk
- Expected []*VisibleInterval
- }{
- // case 0: normal
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
- {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 100, fileId: "abc"},
- {start: 100, stop: 200, fileId: "asdf"},
- {start: 200, stop: 300, fileId: "fsad"},
- },
- },
- // case 1: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 200, fileId: "asdf"},
- },
- },
- // case 2: updates overwrite part of previous chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 50, fileId: "asdf"},
- {start: 50, stop: 100, fileId: "abc"},
- },
- },
- // case 3: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 50, fileId: "asdf"},
- {start: 50, stop: 300, fileId: "xxxx"},
- },
- },
- // case 4: updates far away from prev chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 200, fileId: "asdf"},
- {start: 250, stop: 500, fileId: "xxxx"},
- },
- },
- // case 5: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184},
- {Offset: 70, Size: 150, FileId: "abc", Mtime: 143},
- {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 200, fileId: "asdf"},
- {start: 200, stop: 220, fileId: "abc"},
- },
- },
- // case 6: same updates
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 100, fileId: "abc"},
- },
- },
- // case 7: real updates
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 2097152, FileId: "7,0294cbb9892b", Mtime: 123},
- {Offset: 0, Size: 3145728, FileId: "3,029565bf3092", Mtime: 130},
- {Offset: 2097152, Size: 3145728, FileId: "6,029632f47ae2", Mtime: 140},
- {Offset: 5242880, Size: 3145728, FileId: "2,029734c5aa10", Mtime: 150},
- {Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160},
- {Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 2097152, fileId: "3,029565bf3092"},
- {start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"},
- {start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"},
- {start: 8388608, stop: 11534336, fileId: "5,02982f80de50"},
- {start: 11534336, stop: 14376529, fileId: "7,0299ad723803"},
- },
- },
- // case 8: real bug
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 77824, FileId: "4,0b3df938e301", Mtime: 123},
- {Offset: 471040, Size: 472225 - 471040, FileId: "6,0b3e0650019c", Mtime: 130},
- {Offset: 77824, Size: 208896 - 77824, FileId: "4,0b3f0c7202f0", Mtime: 140},
- {Offset: 208896, Size: 339968 - 208896, FileId: "2,0b4031a72689", Mtime: 150},
- {Offset: 339968, Size: 471040 - 339968, FileId: "3,0b416a557362", Mtime: 160},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 77824, fileId: "4,0b3df938e301"},
- {start: 77824, stop: 208896, fileId: "4,0b3f0c7202f0"},
- {start: 208896, stop: 339968, fileId: "2,0b4031a72689"},
- {start: 339968, stop: 471040, fileId: "3,0b416a557362"},
- {start: 471040, stop: 472225, fileId: "6,0b3e0650019c"},
- },
- },
- }
-
- for i, testcase := range testcases {
- log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
- intervals := NonOverlappingVisibleIntervals(testcase.Chunks)
- for x, interval := range intervals {
- log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
- i, x, interval.start, interval.stop, interval.fileId)
- }
- for x, interval := range intervals {
- if interval.start != testcase.Expected[x].start {
- t.Fatalf("failed on test case %d, interval %d, start %d, expect %d",
- i, x, interval.start, testcase.Expected[x].start)
- }
- if interval.stop != testcase.Expected[x].stop {
- t.Fatalf("failed on test case %d, interval %d, stop %d, expect %d",
- i, x, interval.stop, testcase.Expected[x].stop)
- }
- if interval.fileId != testcase.Expected[x].fileId {
- t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s",
- i, x, interval.fileId, testcase.Expected[x].fileId)
- }
- }
- if len(intervals) != len(testcase.Expected) {
- t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
- }
-
- }
-
-}
-
-func TestChunksReading(t *testing.T) {
-
- testcases := []struct {
- Chunks []*filer_pb.FileChunk
- Offset int64
- Size int
- Expected []*ChunkView
- }{
- // case 0: normal
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
- {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
- },
- Offset: 0,
- Size: 250,
- Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
- {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200},
- },
- },
- // case 1: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- },
- Offset: 50,
- Size: 100,
- Expected: []*ChunkView{
- {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50},
- },
- },
- // case 2: updates overwrite part of previous chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134},
- },
- Offset: 25,
- Size: 50,
- Expected: []*ChunkView{
- {Offset: 25, Size: 25, FileId: "asdf", LogicOffset: 25},
- {Offset: 0, Size: 25, FileId: "abc", LogicOffset: 50},
- },
- },
- // case 3: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Offset: 0,
- Size: 200,
- Expected: []*ChunkView{
- {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0},
- {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50},
- },
- },
- // case 4: updates far away from prev chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Offset: 0,
- Size: 400,
- Expected: []*ChunkView{
- {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
- // {Offset: 0, Size: 150, FileId: "xxxx"}, // missing intervals should not happen
- },
- },
- // case 5: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184},
- {Offset: 70, Size: 150, FileId: "abc", Mtime: 143},
- {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134},
- },
- Offset: 0,
- Size: 220,
- Expected: []*ChunkView{
- {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
- {Offset: 0, Size: 20, FileId: "abc", LogicOffset: 200},
- },
- },
- // case 6: same updates
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- },
- Offset: 0,
- Size: 100,
- Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
- },
- },
- // case 7: edge cases
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
- {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
- },
- Offset: 0,
- Size: 200,
- Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
- },
- },
- }
-
- for i, testcase := range testcases {
- log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
- chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size)
- for x, chunk := range chunks {
- log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
- i, x, chunk.Offset, chunk.Size, chunk.FileId)
- if chunk.Offset != testcase.Expected[x].Offset {
- t.Fatalf("failed on read case %d, chunk %d, Offset %d, expect %d",
- i, x, chunk.Offset, testcase.Expected[x].Offset)
- }
- if chunk.Size != testcase.Expected[x].Size {
- t.Fatalf("failed on read case %d, chunk %d, Size %d, expect %d",
- i, x, chunk.Size, testcase.Expected[x].Size)
- }
- if chunk.FileId != testcase.Expected[x].FileId {
- t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s",
- i, x, chunk.FileId, testcase.Expected[x].FileId)
- }
- if chunk.LogicOffset != testcase.Expected[x].LogicOffset {
- t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d",
- i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset)
- }
- }
- if len(chunks) != len(testcase.Expected) {
- t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected))
- }
- }
-
-}
-
-func BenchmarkCompactFileChunks(b *testing.B) {
-
- var chunks []*filer_pb.FileChunk
-
- k := 1024
-
- for n := 0; n < k; n++ {
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n),
- })
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k),
- })
- }
-
- for n := 0; n < b.N; n++ {
- CompactFileChunks(chunks)
- }
-}
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
deleted file mode 100644
index b724e20fd..000000000
--- a/weed/filer2/filer.go
+++ /dev/null
@@ -1,253 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "os"
- "path/filepath"
- "strings"
- "time"
-
- "google.golang.org/grpc"
-
- "github.com/karlseguin/ccache"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-const PaginationSize = 1024 * 256
-
-var (
- OS_UID = uint32(os.Getuid())
- OS_GID = uint32(os.Getgid())
-)
-
-type Filer struct {
- store *FilerStoreWrapper
- directoryCache *ccache.Cache
- MasterClient *wdclient.MasterClient
- fileIdDeletionChan chan string
- GrpcDialOption grpc.DialOption
-}
-
-func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
- f := &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
- fileIdDeletionChan: make(chan string, PaginationSize),
- GrpcDialOption: grpcDialOption,
- }
-
- go f.loopProcessingDeletion()
-
- return f
-}
-
-func (f *Filer) SetStore(store FilerStore) {
- f.store = NewFilerStoreWrapper(store)
-}
-
-func (f *Filer) DisableDirectoryCache() {
- f.directoryCache = nil
-}
-
-func (fs *Filer) GetMaster() string {
- return fs.MasterClient.GetMaster()
-}
-
-func (fs *Filer) KeepConnectedToMaster() {
- fs.MasterClient.KeepConnectedToMaster()
-}
-
-func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
- return f.store.BeginTransaction(ctx)
-}
-
-func (f *Filer) CommitTransaction(ctx context.Context) error {
- return f.store.CommitTransaction(ctx)
-}
-
-func (f *Filer) RollbackTransaction(ctx context.Context) error {
- return f.store.RollbackTransaction(ctx)
-}
-
-func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
-
- if string(entry.FullPath) == "/" {
- return nil
- }
-
- dirParts := strings.Split(string(entry.FullPath), "/")
-
- // fmt.Printf("directory parts: %+v\n", dirParts)
-
- var lastDirectoryEntry *Entry
-
- for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...))
- // fmt.Printf("%d directory: %+v\n", i, dirPath)
-
- // first check local cache
- dirEntry := f.cacheGetDirectory(dirPath)
-
- // not found, check the store directly
- if dirEntry == nil {
- glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
- } else {
- glog.V(4).Infof("found cached directory: %s", dirPath)
- }
-
- // no such existing directory
- if dirEntry == nil {
-
- // create the directory
- now := time.Now()
-
- dirEntry = &Entry{
- FullPath: FullPath(dirPath),
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | 0770,
- Uid: entry.Uid,
- Gid: entry.Gid,
- },
- }
-
- glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.store.InsertEntry(ctx, dirEntry)
- if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound {
- return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
- }
- } else {
- f.NotifyUpdateEvent(nil, dirEntry, false)
- }
-
- } else if !dirEntry.IsDirectory() {
- return fmt.Errorf("%s is a file", dirPath)
- }
-
- // cache the directory entry
- f.cacheSetDirectory(dirPath, dirEntry, i)
-
- // remember the direct parent directory entry
- if i == len(dirParts)-1 {
- lastDirectoryEntry = dirEntry
- }
-
- }
-
- if lastDirectoryEntry == nil {
- return fmt.Errorf("parent folder not found: %v", entry.FullPath)
- }
-
- /*
- if !hasWritePermission(lastDirectoryEntry, entry) {
- glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
- lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
- return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
- }
- */
-
- oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
-
- if oldEntry == nil {
- if err := f.store.InsertEntry(ctx, entry); err != nil {
- glog.Errorf("insert entry %s: %v", entry.FullPath, err)
- return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
- }
- } else {
- if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
- glog.Errorf("update entry %s: %v", entry.FullPath, err)
- return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
- }
- }
-
- f.NotifyUpdateEvent(oldEntry, entry, true)
-
- f.deleteChunksIfNotNew(oldEntry, entry)
-
- return nil
-}
-
-func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
- if oldEntry != nil {
- if oldEntry.IsDirectory() && !entry.IsDirectory() {
- glog.Errorf("existing %s is a directory", entry.FullPath)
- return fmt.Errorf("existing %s is a directory", entry.FullPath)
- }
- if !oldEntry.IsDirectory() && entry.IsDirectory() {
- glog.Errorf("existing %s is a file", entry.FullPath)
- return fmt.Errorf("existing %s is a file", entry.FullPath)
- }
- }
- return f.store.UpdateEntry(ctx, entry)
-}
-
-func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
-
- now := time.Now()
-
- if string(p) == "/" {
- return &Entry{
- FullPath: p,
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | 0755,
- Uid: OS_UID,
- Gid: OS_GID,
- },
- }, nil
- }
- return f.store.FindEntry(ctx, p)
-}
-
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
- if strings.HasSuffix(string(p), "/") && len(p) > 1 {
- p = p[0 : len(p)-1]
- }
- return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
-}
-
-func (f *Filer) cacheDelDirectory(dirpath string) {
-
- if dirpath == "/" {
- return
- }
-
- if f.directoryCache == nil {
- return
- }
- f.directoryCache.Delete(dirpath)
- return
-}
-
-func (f *Filer) cacheGetDirectory(dirpath string) *Entry {
-
- if f.directoryCache == nil {
- return nil
- }
- item := f.directoryCache.Get(dirpath)
- if item == nil {
- return nil
- }
- return item.Value().(*Entry)
-}
-
-func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
-
- if f.directoryCache == nil {
- return
- }
-
- minutes := 60
- if level < 10 {
- minutes -= level * 6
- }
-
- f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
-}
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
deleted file mode 100644
index 1a10f7c20..000000000
--- a/weed/filer2/filer_client_util.go
+++ /dev/null
@@ -1,172 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "io"
- "math"
- "strings"
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func VolumeId(fileId string) string {
- lastCommaIndex := strings.LastIndex(fileId, ",")
- if lastCommaIndex > 0 {
- return fileId[:lastCommaIndex]
- }
- return fileId
-}
-
-type FilerClient interface {
- WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error
-}
-
-func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
- var vids []string
- for _, chunkView := range chunkViews {
- vids = append(vids, VolumeId(chunkView.FileId))
- }
-
- vid2Locations := make(map[string]*filer_pb.Locations)
-
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
- VolumeIds: vids,
- })
- if err != nil {
- return err
- }
-
- vid2Locations = resp.LocationsMap
-
- return nil
- })
-
- if err != nil {
- return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
- }
-
- var wg sync.WaitGroup
- for _, chunkView := range chunkViews {
- wg.Add(1)
- go func(chunkView *ChunkView) {
- defer wg.Done()
-
- glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
-
- locations := vid2Locations[VolumeId(chunkView.FileId)]
- if locations == nil || len(locations.Locations) == 0 {
- glog.V(0).Infof("failed to locate %s", chunkView.FileId)
- err = fmt.Errorf("failed to locate %s", chunkView.FileId)
- return
- }
-
- var n int64
- n, err = util.ReadUrl(
- fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
- chunkView.Offset,
- int(chunkView.Size),
- buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
- !chunkView.IsFullChunk)
-
- if err != nil {
-
- glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err)
-
- err = fmt.Errorf("failed to read http://%s/%s: %v",
- locations.Locations[0].Url, chunkView.FileId, err)
- return
- }
-
- glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
- totalRead += n
-
- }(chunkView)
- }
- wg.Wait()
- return
-}
-
-func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) {
-
- dir, name := FullPath(fullFilePath).DirAndName()
-
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir,
- Name: name,
- }
-
- glog.V(3).Infof("read %s request: %v", fullFilePath, request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil {
- if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
- return nil
- }
- glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err)
- return err
- }
-
- if resp.Entry == nil {
- glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
- return nil
- }
-
- entry = resp.Entry
- return nil
- })
-
- return
-}
-
-func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
-
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- lastEntryName := ""
-
- request := &filer_pb.ListEntriesRequest{
- Directory: fullDirPath,
- Prefix: prefix,
- StartFromFileName: lastEntryName,
- Limit: math.MaxUint32,
- }
-
- glog.V(3).Infof("read directory: %v", request)
- stream, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
-
- var prevEntry *filer_pb.Entry
- for {
- resp, recvErr := stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- if prevEntry != nil {
- fn(prevEntry, true)
- }
- break
- } else {
- return recvErr
- }
- }
- if prevEntry != nil {
- fn(prevEntry, false)
- }
- prevEntry = resp.Entry
- }
-
- return nil
-
- })
-
- return
-}
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
deleted file mode 100644
index 75a09e7ef..000000000
--- a/weed/filer2/filer_delete_entry.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
- if p == "/" {
- return nil
- }
-
- entry, findErr := f.FindEntry(ctx, p)
- if findErr != nil {
- return findErr
- }
-
- var chunks []*filer_pb.FileChunk
- chunks = append(chunks, entry.Chunks...)
- if entry.IsDirectory() {
- // delete the folder children, not including the folder itself
- var dirChunks []*filer_pb.FileChunk
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
- if err != nil {
- return fmt.Errorf("delete directory %s: %v", p, err)
- }
- chunks = append(chunks, dirChunks...)
- f.cacheDelDirectory(string(p))
- }
- // delete the file or folder
- err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
- if err != nil {
- return fmt.Errorf("delete file %s: %v", p, err)
- }
-
- if shouldDeleteChunks {
- go f.DeleteChunks(chunks)
- }
-
- return nil
-}
-
-func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
-
- lastFileName := ""
- includeLastFile := false
- for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
- if err != nil {
- glog.Errorf("list folder %s: %v", entry.FullPath, err)
- return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
- }
- if lastFileName == "" && !isRecursive && len(entries) > 0 {
- // only for first iteration in the loop
- return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
- }
-
- for _, sub := range entries {
- lastFileName = sub.Name()
- var dirChunks []*filer_pb.FileChunk
- if sub.IsDirectory() {
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
- }
- if err != nil && !ignoreRecursiveError {
- return nil, err
- }
- if shouldDeleteChunks {
- chunks = append(chunks, dirChunks...)
- }
- }
-
- if len(entries) < PaginationSize {
- break
- }
- }
-
- f.cacheDelDirectory(string(entry.FullPath))
-
- glog.V(3).Infof("deleting directory %v", entry.FullPath)
-
- if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
- return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
-
- return chunks, nil
-}
-
-func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
-
- glog.V(3).Infof("deleting entry %v", entry.FullPath)
-
- if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
- return fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
-
- return nil
-}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
deleted file mode 100644
index 9937685f7..000000000
--- a/weed/filer2/filer_deletion.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package filer2
-
-import (
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func (f *Filer) loopProcessingDeletion() {
-
- ticker := time.NewTicker(5 * time.Second)
-
- lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
- m := make(map[string]operation.LookupResult)
- for _, vid := range vids {
- locs, _ := f.MasterClient.GetVidLocations(vid)
- var locations []operation.Location
- for _, loc := range locs {
- locations = append(locations, operation.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
- })
- }
- m[vid] = operation.LookupResult{
- VolumeId: vid,
- Locations: locations,
- }
- }
- return m, nil
- }
-
- var fileIds []string
- for {
- select {
- case fid := <-f.fileIdDeletionChan:
- fileIds = append(fileIds, fid)
- if len(fileIds) >= 4096 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
- }
- }
- }
-}
-
-func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
- for _, chunk := range chunks {
- f.fileIdDeletionChan <- chunk.GetFileIdString()
- }
-}
-
-// DeleteFileByFileId direct delete by file id.
-// Only used when the fileId is not being managed by snapshots.
-func (f *Filer) DeleteFileByFileId(fileId string) {
- f.fileIdDeletionChan <- fileId
-}
-
-func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
-
- if oldEntry == nil {
- return
- }
- if newEntry == nil {
- f.DeleteChunks(oldEntry.Chunks)
- }
-
- var toDelete []*filer_pb.FileChunk
- newChunkIds := make(map[string]bool)
- for _, newChunk := range newEntry.Chunks {
- newChunkIds[newChunk.GetFileIdString()] = true
- }
-
- for _, oldChunk := range oldEntry.Chunks {
- if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
- toDelete = append(toDelete, oldChunk)
- }
- }
- f.DeleteChunks(toDelete)
-}
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
deleted file mode 100644
index c37381116..000000000
--- a/weed/filer2/filer_notify.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package filer2
-
-import (
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/notification"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) {
- var key string
- if oldEntry != nil {
- key = string(oldEntry.FullPath)
- } else if newEntry != nil {
- key = string(newEntry.FullPath)
- } else {
- return
- }
-
- if notification.Queue != nil {
-
- glog.V(3).Infof("notifying entry update %v", key)
-
- newParentPath := ""
- if newEntry != nil {
- newParentPath, _ = newEntry.FullPath.DirAndName()
- }
-
- notification.Queue.SendMessage(
- key,
- &filer_pb.EventNotification{
- OldEntry: oldEntry.ToProtoEntry(),
- NewEntry: newEntry.ToProtoEntry(),
- DeleteChunks: deleteChunks,
- NewParentPath: newParentPath,
- },
- )
-
- }
-}
diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go
deleted file mode 100644
index b74e2ad35..000000000
--- a/weed/filer2/filer_notify_test.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package filer2
-
-import (
- "testing"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/golang/protobuf/proto"
-)
-
-func TestProtoMarshalText(t *testing.T) {
-
- oldEntry := &Entry{
- FullPath: FullPath("/this/path/to"),
- Attr: Attr{
- Mtime: time.Now(),
- Mode: 0644,
- Uid: 1,
- Mime: "text/json",
- TtlSec: 25,
- },
- Chunks: []*filer_pb.FileChunk{
- &filer_pb.FileChunk{
- FileId: "234,2423423422",
- Offset: 234234,
- Size: 234,
- Mtime: 12312423,
- ETag: "2342342354",
- SourceFileId: "23234,2342342342",
- },
- },
- }
-
- notification := &filer_pb.EventNotification{
- OldEntry: oldEntry.ToProtoEntry(),
- NewEntry: nil,
- DeleteChunks: true,
- }
-
- text := proto.MarshalTextString(notification)
-
- notification2 := &filer_pb.EventNotification{}
- proto.UnmarshalText(text, notification2)
-
- if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId {
- t.Fatalf("marshal/unmarshal error: %s", text)
- }
-
- println(text)
-
-}
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
deleted file mode 100644
index 0bb0bd611..000000000
--- a/weed/filer2/filerstore.go
+++ /dev/null
@@ -1,138 +0,0 @@
-package filer2
-
-import (
- "context"
- "errors"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type FilerStore interface {
- // GetName gets the name to locate the configuration in filer.toml file
- GetName() string
- // Initialize initializes the file store
- Initialize(configuration util.Configuration) error
- InsertEntry(context.Context, *Entry) error
- UpdateEntry(context.Context, *Entry) (err error)
- // err == filer2.ErrNotFound if not found
- FindEntry(context.Context, FullPath) (entry *Entry, err error)
- DeleteEntry(context.Context, FullPath) (err error)
- DeleteFolderChildren(context.Context, FullPath) (err error)
- ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
-
- BeginTransaction(ctx context.Context) (context.Context, error)
- CommitTransaction(ctx context.Context) error
- RollbackTransaction(ctx context.Context) error
-}
-
-var ErrNotFound = errors.New("filer: no entry is found in filer store")
-
-type FilerStoreWrapper struct {
- actualStore FilerStore
-}
-
-func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
- if innerStore, ok := store.(*FilerStoreWrapper); ok {
- return innerStore
- }
- return &FilerStoreWrapper{
- actualStore: store,
- }
-}
-
-func (fsw *FilerStoreWrapper) GetName() string {
- return fsw.actualStore.GetName()
-}
-
-func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error {
- return fsw.actualStore.Initialize(configuration)
-}
-
-func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
- stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- return fsw.actualStore.InsertEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
- stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- return fsw.actualStore.UpdateEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
- }()
-
- entry, err = fsw.actualStore.FindEntry(ctx, fp)
- if err != nil {
- return nil, err
- }
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- return
-}
-
-func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
- }()
-
- return fsw.actualStore.DeleteEntry(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
- }()
-
- return fsw.actualStore.DeleteFolderChildren(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
- }()
-
- entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, err
-}
-
-func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
- return fsw.actualStore.BeginTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
- return fsw.actualStore.CommitTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
- return fsw.actualStore.RollbackTransaction(ctx)
-}
diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go
deleted file mode 100644
index 191e51cf3..000000000
--- a/weed/filer2/fullpath.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package filer2
-
-import (
- "path/filepath"
- "strings"
-)
-
-type FullPath string
-
-func NewFullPath(dir, name string) FullPath {
- return FullPath(dir).Child(name)
-}
-
-func (fp FullPath) DirAndName() (string, string) {
- dir, name := filepath.Split(string(fp))
- if dir == "/" {
- return dir, name
- }
- if len(dir) < 1 {
- return "/", ""
- }
- return dir[:len(dir)-1], name
-}
-
-func (fp FullPath) Name() string {
- _, name := filepath.Split(string(fp))
- return name
-}
-
-func (fp FullPath) Child(name string) FullPath {
- dir := string(fp)
- if strings.HasSuffix(dir, "/") {
- return FullPath(dir + name)
- }
- return FullPath(dir + "/" + name)
-}
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
deleted file mode 100644
index 4952b3b3a..000000000
--- a/weed/filer2/leveldb/leveldb_store.go
+++ /dev/null
@@ -1,217 +0,0 @@
-package leveldb
-
-import (
- "bytes"
- "context"
- "fmt"
-
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/opt"
- leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-const (
- DIR_FILE_SEPARATOR = byte(0x00)
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &LevelDBStore{})
-}
-
-type LevelDBStore struct {
- db *leveldb.DB
-}
-
-func (store *LevelDBStore) GetName() string {
- return "leveldb"
-}
-
-func (store *LevelDBStore) Initialize(configuration weed_util.Configuration) (err error) {
- dir := configuration.GetString("dir")
- return store.initialize(dir)
-}
-
-func (store *LevelDBStore) initialize(dir string) (err error) {
- glog.Infof("filer store dir: %s", dir)
- if err := weed_util.TestFolderWritable(dir); err != nil {
- return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
- }
-
- opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10,
- }
-
- if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
- glog.Infof("filer store open dir %s: %v", dir, err)
- return
- }
- return
-}
-
-func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *LevelDBStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- key := genKey(entry.DirAndName())
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- err = store.db.Put(key, value, nil)
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
-
- return nil
-}
-
-func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- key := genKey(fullpath.DirAndName())
-
- data, err := store.db.Get(key, nil)
-
- if err == leveldb.ErrNotFound {
- return nil, filer2.ErrNotFound
- }
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
-
- return entry, nil
-}
-
-func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- key := genKey(fullpath.DirAndName())
-
- err = store.db.Delete(key, nil)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
-
- batch := new(leveldb.Batch)
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
- iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- batch.Delete([]byte(genKey(string(fullpath), fileName)))
- }
- iter.Release()
-
- err = store.db.Write(batch, nil)
-
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- if fileName == startFileName && !inclusive {
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
- }
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
- iter.Release()
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string) (key []byte) {
- key = []byte(dirPath)
- key = append(key, DIR_FILE_SEPARATOR)
- key = append(key, []byte(fileName)...)
- return key
-}
-
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
- keyPrefix = []byte(string(fullpath))
- keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix
-}
-
-func getNameFromKey(key []byte) string {
-
- sepIndex := len(key) - 1
- for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
- sepIndex--
- }
-
- return string(key[sepIndex+1:])
-
-}
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
deleted file mode 100644
index 904de8c97..000000000
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package leveldb
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "io/ioutil"
- "os"
- "testing"
-)
-
-func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
- store := &LevelDBStore{}
- store.initialize(dir)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
-
- ctx := context.Background()
-
- entry1 := &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- if err := filer.CreateEntry(ctx, entry1); err != nil {
- t.Errorf("create entry %v: %v", entry1.FullPath, err)
- return
- }
-
- entry, err := filer.FindEntry(ctx, fullpath)
-
- if err != nil {
- t.Errorf("find entry: %v", err)
- return
- }
-
- if entry.FullPath != entry1.FullPath {
- t.Errorf("find wrong entry: %v", entry.FullPath)
- return
- }
-
- // checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
-
-func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
- store := &LevelDBStore{}
- store.initialize(dir)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- // checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
- if err != nil {
- t.Errorf("list entries: %v", err)
- return
- }
- if len(entries) != 0 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
deleted file mode 100644
index 8a16822ab..000000000
--- a/weed/filer2/leveldb2/leveldb2_store.go
+++ /dev/null
@@ -1,237 +0,0 @@
-package leveldb
-
-import (
- "bytes"
- "context"
- "crypto/md5"
- "fmt"
- "io"
- "os"
-
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/opt"
- leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &LevelDB2Store{})
-}
-
-type LevelDB2Store struct {
- dbs []*leveldb.DB
- dbCount int
-}
-
-func (store *LevelDB2Store) GetName() string {
- return "leveldb2"
-}
-
-func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) {
- dir := configuration.GetString("dir")
- return store.initialize(dir, 8)
-}
-
-func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
- glog.Infof("filer store leveldb2 dir: %s", dir)
- if err := weed_util.TestFolderWritable(dir); err != nil {
- return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
- }
-
- opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- }
-
- for d := 0; d < dbCount; d++ {
- dbFolder := fmt.Sprintf("%s/%02d", dir, d)
- os.MkdirAll(dbFolder, 0755)
- db, dbErr := leveldb.OpenFile(dbFolder, opts)
- if dbErr != nil {
- glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
- return
- }
- store.dbs = append(store.dbs, db)
- }
- store.dbCount = dbCount
-
- return
-}
-
-func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- dir, name := entry.DirAndName()
- key, partitionId := genKey(dir, name, store.dbCount)
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- err = store.dbs[partitionId].Put(key, value, nil)
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
-
- return nil
-}
-
-func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- dir, name := fullpath.DirAndName()
- key, partitionId := genKey(dir, name, store.dbCount)
-
- data, err := store.dbs[partitionId].Get(key, nil)
-
- if err == leveldb.ErrNotFound {
- return nil, filer2.ErrNotFound
- }
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
-
- return entry, nil
-}
-
-func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- dir, name := fullpath.DirAndName()
- key, partitionId := genKey(dir, name, store.dbCount)
-
- err = store.dbs[partitionId].Delete(key, nil)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
- directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
-
- batch := new(leveldb.Batch)
-
- iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- batch.Delete(append(directoryPrefix, []byte(fileName)...))
- }
- iter.Release()
-
- err = store.dbs[partitionId].Write(batch, nil)
-
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
- lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
-
- iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- if fileName == startFileName && !inclusive {
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
- }
-
- // println("list", entry.FullPath, "chunks", len(entry.Chunks))
-
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
- iter.Release()
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) {
- key, partitionId = hashToBytes(dirPath, dbCount)
- key = append(key, []byte(fileName)...)
- return key, partitionId
-}
-
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
- keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount)
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix, partitionId
-}
-
-func getNameFromKey(key []byte) string {
-
- return string(key[md5.Size:])
-
-}
-
-// hash directory, and use last byte for partitioning
-func hashToBytes(dir string, dbCount int) ([]byte, int) {
- h := md5.New()
- io.WriteString(h, dir)
-
- b := h.Sum(nil)
-
- x := b[len(b)-1]
-
- return b, int(x) % dbCount
-}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
deleted file mode 100644
index e28ef7dac..000000000
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package leveldb
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "io/ioutil"
- "os"
- "testing"
-)
-
-func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
- store := &LevelDB2Store{}
- store.initialize(dir, 2)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
-
- ctx := context.Background()
-
- entry1 := &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- if err := filer.CreateEntry(ctx, entry1); err != nil {
- t.Errorf("create entry %v: %v", entry1.FullPath, err)
- return
- }
-
- entry, err := filer.FindEntry(ctx, fullpath)
-
- if err != nil {
- t.Errorf("find entry: %v", err)
- return
- }
-
- if entry.FullPath != entry1.FullPath {
- t.Errorf("find wrong entry: %v", entry.FullPath)
- return
- }
-
- // checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
-
-func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
- store := &LevelDB2Store{}
- store.initialize(dir, 2)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- // checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
- if err != nil {
- t.Errorf("list entries: %v", err)
- return
- }
- if len(entries) != 0 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go
deleted file mode 100644
index d1b06ece5..000000000
--- a/weed/filer2/mysql/mysql_store.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package mysql
-
-import (
- "database/sql"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
- "github.com/chrislusf/seaweedfs/weed/util"
- _ "github.com/go-sql-driver/mysql"
-)
-
-const (
- CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &MysqlStore{})
-}
-
-type MysqlStore struct {
- abstract_sql.AbstractSqlStore
-}
-
-func (store *MysqlStore) GetName() string {
- return "mysql"
-}
-
-func (store *MysqlStore) Initialize(configuration util.Configuration) (err error) {
- return store.initialize(
- configuration.GetString("username"),
- configuration.GetString("password"),
- configuration.GetString("hostname"),
- configuration.GetInt("port"),
- configuration.GetString("database"),
- configuration.GetInt("connection_max_idle"),
- configuration.GetInt("connection_max_open"),
- configuration.GetBool("interpolateParams"),
- )
-}
-
-func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
- interpolateParams bool) (err error) {
-
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
- store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?"
-
- sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
- if interpolateParams {
- sqlUrl += "&interpolateParams=true"
- }
-
- var dbErr error
- store.DB, dbErr = sql.Open("mysql", sqlUrl)
- if dbErr != nil {
- store.DB.Close()
- store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
- }
-
- store.DB.SetMaxIdleConns(maxIdle)
- store.DB.SetMaxOpenConns(maxOpen)
-
- if err = store.DB.Ping(); err != nil {
- return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
- }
-
- return nil
-}
diff --git a/weed/filer2/permission.go b/weed/filer2/permission.go
deleted file mode 100644
index 8a9508fbc..000000000
--- a/weed/filer2/permission.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package filer2
-
-func hasWritePermission(dir *Entry, entry *Entry) bool {
-
- if dir == nil {
- return false
- }
-
- if dir.Uid == entry.Uid && dir.Mode&0200 > 0 {
- return true
- }
-
- if dir.Gid == entry.Gid && dir.Mode&0020 > 0 {
- return true
- }
-
- if dir.Mode&0002 > 0 {
- return true
- }
-
- return false
-}
diff --git a/weed/filer2/postgres/README.txt b/weed/filer2/postgres/README.txt
deleted file mode 100644
index cb0c99c63..000000000
--- a/weed/filer2/postgres/README.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-
-1. create "seaweedfs" database
-
-export PGHOME=/Library/PostgreSQL/10
-$PGHOME/bin/createdb --username=postgres --password seaweedfs
-
-2. create "filemeta" table
-$PGHOME/bin/psql --username=postgres --password seaweedfs
-
-CREATE TABLE IF NOT EXISTS filemeta (
- dirhash BIGINT,
- name VARCHAR(65535),
- directory VARCHAR(65535),
- meta bytea,
- PRIMARY KEY (dirhash, name)
-);
-
diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go
deleted file mode 100644
index 3ec000fe0..000000000
--- a/weed/filer2/postgres/postgres_store.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package postgres
-
-import (
- "database/sql"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
- "github.com/chrislusf/seaweedfs/weed/util"
- _ "github.com/lib/pq"
-)
-
-const (
- CONNECTION_URL_PATTERN = "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &PostgresStore{})
-}
-
-type PostgresStore struct {
- abstract_sql.AbstractSqlStore
-}
-
-func (store *PostgresStore) GetName() string {
- return "postgres"
-}
-
-func (store *PostgresStore) Initialize(configuration util.Configuration) (err error) {
- return store.initialize(
- configuration.GetString("username"),
- configuration.GetString("password"),
- configuration.GetString("hostname"),
- configuration.GetInt("port"),
- configuration.GetString("database"),
- configuration.GetString("sslmode"),
- configuration.GetInt("connection_max_idle"),
- configuration.GetInt("connection_max_open"),
- )
-}
-
-func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) {
-
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"
- store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
-
- sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, password, database, sslmode)
- var dbErr error
- store.DB, dbErr = sql.Open("postgres", sqlUrl)
- if dbErr != nil {
- store.DB.Close()
- store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
- }
-
- store.DB.SetMaxIdleConns(maxIdle)
- store.DB.SetMaxOpenConns(maxOpen)
-
- if err = store.DB.Ping(); err != nil {
- return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
- }
-
- return nil
-}
diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go
deleted file mode 100644
index f1ad4b35c..000000000
--- a/weed/filer2/redis/redis_cluster_store.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package redis
-
-import (
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &RedisClusterStore{})
-}
-
-type RedisClusterStore struct {
- UniversalRedisStore
-}
-
-func (store *RedisClusterStore) GetName() string {
- return "redis_cluster"
-}
-
-func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) {
-
- configuration.SetDefault("useReadOnly", true)
- configuration.SetDefault("routeByLatency", true)
-
- return store.initialize(
- configuration.GetStringSlice("addresses"),
- configuration.GetString("password"),
- configuration.GetBool("useReadOnly"),
- configuration.GetBool("routeByLatency"),
- )
-}
-
-func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
- store.Client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: addresses,
- Password: password,
- ReadOnly: readOnly,
- RouteByLatency: routeByLatency,
- })
- return
-}
diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go
deleted file mode 100644
index c56fa014c..000000000
--- a/weed/filer2/redis/redis_store.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package redis
-
-import (
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &RedisStore{})
-}
-
-type RedisStore struct {
- UniversalRedisStore
-}
-
-func (store *RedisStore) GetName() string {
- return "redis"
-}
-
-func (store *RedisStore) Initialize(configuration util.Configuration) (err error) {
- return store.initialize(
- configuration.GetString("address"),
- configuration.GetString("password"),
- configuration.GetInt("database"),
- )
-}
-
-func (store *RedisStore) initialize(hostPort string, password string, database int) (err error) {
- store.Client = redis.NewClient(&redis.Options{
- Addr: hostPort,
- Password: password,
- DB: database,
- })
- return
-}
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
deleted file mode 100644
index 62257e91e..000000000
--- a/weed/filer2/redis/universal_redis_store.go
+++ /dev/null
@@ -1,171 +0,0 @@
-package redis
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/go-redis/redis"
- "sort"
- "strings"
- "time"
-)
-
-const (
- DIR_LIST_MARKER = "\x00"
-)
-
-type UniversalRedisStore struct {
- Client redis.UniversalClient
-}
-
-func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- dir, name := entry.FullPath.DirAndName()
- if name != "" {
- _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result()
- if err != nil {
- return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
-
- data, err := store.Client.Get(string(fullpath)).Result()
- if err == redis.Nil {
- return nil, filer2.ErrNotFound
- }
-
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", fullpath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks([]byte(data))
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
-
- _, err = store.Client.Del(string(fullpath)).Result()
-
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- dir, name := fullpath.DirAndName()
- if name != "" {
- _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
- if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
-
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
- if err != nil {
- return fmt.Errorf("delete folder %s : %v", fullpath, err)
- }
-
- for _, fileName := range members {
- path := filer2.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
- if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
- if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
- }
-
- // skip
- if startFileName != "" {
- var t []string
- for _, m := range members {
- if strings.Compare(m, startFileName) >= 0 {
- if m == startFileName {
- if inclusive {
- t = append(t, m)
- }
- } else {
- t = append(t, m)
- }
- }
- }
- members = t
- }
-
- // sort
- sort.Slice(members, func(i, j int) bool {
- return strings.Compare(members[i], members[j]) < 0
- })
-
- // limit
- if limit < len(members) {
- members = members[:limit]
- }
-
- // fetch entry meta
- for _, fileName := range members {
- path := filer2.NewFullPath(string(fullpath), fileName)
- entry, err := store.FindEntry(ctx, path)
- if err != nil {
- glog.V(0).Infof("list %s : %v", path, err)
- } else {
- entries = append(entries, entry)
- }
- }
-
- return entries, err
-}
-
-func genDirectoryListKey(dir string) (dirList string) {
- return dir + DIR_LIST_MARKER
-}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
deleted file mode 100644
index 01b87cad1..000000000
--- a/weed/filer2/stream.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package filer2
-
-import (
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error {
-
- chunkViews := ViewFromChunks(chunks, offset, size)
-
- fileId2Url := make(map[string]string)
-
- for _, chunkView := range chunkViews {
-
- urlString, err := masterClient.LookupFileId(chunkView.FileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- fileId2Url[chunkView.FileId] = urlString
- }
-
- for _, chunkView := range chunkViews {
- urlString := fileId2Url[chunkView.FileId]
- _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- }
-
- return nil
-
-}
diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go
deleted file mode 100644
index 4eb8cb90d..000000000
--- a/weed/filer2/tikv/tikv_store.go
+++ /dev/null
@@ -1,251 +0,0 @@
-// +build !386
-// +build !arm
-
-package tikv
-
-import (
- "bytes"
- "context"
- "crypto/md5"
- "fmt"
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-
- "github.com/pingcap/tidb/kv"
- "github.com/pingcap/tidb/store/tikv"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &TikvStore{})
-}
-
-type TikvStore struct {
- store kv.Storage
-}
-
-func (store *TikvStore) GetName() string {
- return "tikv"
-}
-
-func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) {
- pdAddr := configuration.GetString("pdAddress")
- return store.initialize(pdAddr)
-}
-
-func (store *TikvStore) initialize(pdAddr string) (err error) {
- glog.Infof("filer store tikv pd address: %s", pdAddr)
-
- driver := tikv.Driver{}
-
- store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr))
-
- if err != nil {
- return fmt.Errorf("open tikv %s : %v", pdAddr, err)
- }
-
- return
-}
-
-func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- tx, err := store.store.Begin()
- if err != nil {
- return ctx, err
- }
- return context.WithValue(ctx, "tx", tx), nil
-}
-func (store *TikvStore) CommitTransaction(ctx context.Context) error {
- tx, ok := ctx.Value("tx").(kv.Transaction)
- if ok {
- return tx.Commit(ctx)
- }
- return nil
-}
-func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
- tx, ok := ctx.Value("tx").(kv.Transaction)
- if ok {
- return tx.Rollback()
- }
- return nil
-}
-
-func (store *TikvStore) getTx(ctx context.Context) kv.Transaction {
- if tx, ok := ctx.Value("tx").(kv.Transaction); ok {
- return tx
- }
- return nil
-}
-
-func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- dir, name := entry.DirAndName()
- key := genKey(dir, name)
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- err = store.getTx(ctx).Set(key, value)
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
-
- return nil
-}
-
-func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- dir, name := fullpath.DirAndName()
- key := genKey(dir, name)
-
- data, err := store.getTx(ctx).Get(ctx, key)
-
- if err == kv.ErrNotExist {
- return nil, filer2.ErrNotFound
- }
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
-
- return entry, nil
-}
-
-func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- dir, name := fullpath.DirAndName()
- key := genKey(dir, name)
-
- err = store.getTx(ctx).Delete(key)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- tx := store.getTx(ctx)
-
- iter, err := tx.Iter(directoryPrefix, nil)
- if err != nil {
- return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
- }
- defer iter.Close()
- for iter.Valid() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- iter.Next()
- continue
- }
-
- if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- iter.Next()
- }
-
- return nil
-}
-
-func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
- lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName)
-
- iter, err := store.getTx(ctx).Iter(lastFileStart, nil)
- if err != nil {
- return nil, fmt.Errorf("list %s: %v", fullpath, err)
- }
- defer iter.Close()
- for iter.Valid() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- iter.Next()
- continue
- }
- if fileName == startFileName && !inclusive {
- iter.Next()
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
- }
-
- // println("list", entry.FullPath, "chunks", len(entry.Chunks))
-
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- iter.Next()
- }
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string) (key []byte) {
- key = hashToBytes(dirPath)
- key = append(key, []byte(fileName)...)
- return key
-}
-
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
- keyPrefix = hashToBytes(string(fullpath))
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix
-}
-
-func getNameFromKey(key []byte) string {
-
- return string(key[md5.Size:])
-
-}
-
-// hash directory
-func hashToBytes(dir string) []byte {
- h := md5.New()
- io.WriteString(h, dir)
-
- b := h.Sum(nil)
-
- return b
-}
diff --git a/weed/filer2/tikv/tikv_store_unsupported.go b/weed/filer2/tikv/tikv_store_unsupported.go
deleted file mode 100644
index 36de2d974..000000000
--- a/weed/filer2/tikv/tikv_store_unsupported.go
+++ /dev/null
@@ -1,65 +0,0 @@
-// +build 386 arm
-
-package tikv
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &TikvStore{})
-}
-
-type TikvStore struct {
-}
-
-func (store *TikvStore) GetName() string {
- return "tikv"
-}
-
-func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) initialize(pdAddr string) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return nil, fmt.Errorf("not implemented for 32 bit computers")
-}
-func (store *TikvStore) CommitTransaction(ctx context.Context) error {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- return nil, fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
- return fmt.Errorf("not implemented for 32 bit computers")
-}
-
-func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
- return nil, fmt.Errorf("not implemented for 32 bit computers")
-}