aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go206
-rw-r--r--weed/filer2/cassandra/README.txt14
-rw-r--r--weed/filer2/cassandra/cassandra_store.go163
-rw-r--r--weed/filer2/configuration.go50
-rw-r--r--weed/filer2/entry.go91
-rw-r--r--weed/filer2/entry_codec.go124
-rw-r--r--weed/filer2/etcd/etcd_store.go204
-rw-r--r--weed/filer2/filechunk_manifest.go168
-rw-r--r--weed/filer2/filechunk_manifest_test.go113
-rw-r--r--weed/filer2/filechunks.go284
-rw-r--r--weed/filer2/filechunks2_test.go46
-rw-r--r--weed/filer2/filechunks_test.go539
-rw-r--r--weed/filer2/filer.go290
-rw-r--r--weed/filer2/filer_buckets.go121
-rw-r--r--weed/filer2/filer_delete_entry.go129
-rw-r--r--weed/filer2/filer_deletion.go109
-rw-r--r--weed/filer2/filer_notify.go169
-rw-r--r--weed/filer2/filer_notify_append.go73
-rw-r--r--weed/filer2/filer_notify_test.go53
-rw-r--r--weed/filer2/filerstore.go208
-rw-r--r--weed/filer2/leveldb/leveldb_store.go231
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go88
-rw-r--r--weed/filer2/leveldb2/leveldb2_local_store.go43
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go251
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go88
-rw-r--r--weed/filer2/meta_aggregator.go131
-rw-r--r--weed/filer2/meta_replay.go37
-rw-r--r--weed/filer2/mongodb/mongodb_store.go214
-rw-r--r--weed/filer2/mysql/mysql_store.go74
-rw-r--r--weed/filer2/permission.go22
-rw-r--r--weed/filer2/postgres/README.txt17
-rw-r--r--weed/filer2/postgres/postgres_store.go75
-rw-r--r--weed/filer2/reader_at.go149
-rw-r--r--weed/filer2/reader_at_test.go156
-rw-r--r--weed/filer2/redis/redis_cluster_store.go42
-rw-r--r--weed/filer2/redis/redis_store.go36
-rw-r--r--weed/filer2/redis/universal_redis_store.go191
-rw-r--r--weed/filer2/redis2/redis_cluster_store.go42
-rw-r--r--weed/filer2/redis2/redis_store.go36
-rw-r--r--weed/filer2/redis2/universal_redis_store.go166
-rw-r--r--weed/filer2/stream.go204
-rw-r--r--weed/filer2/topics.go6
42 files changed, 0 insertions, 5453 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
deleted file mode 100644
index ce6377aac..000000000
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ /dev/null
@@ -1,206 +0,0 @@
-package abstract_sql
-
-import (
- "context"
- "database/sql"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlDeleteFolderChildren string
- SqlListExclusive string
- SqlListInclusive string
-}
-
-type TxOrDB interface {
- ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
- QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
- QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
-}
-
-func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
- Isolation: sql.LevelReadCommitted,
- ReadOnly: false,
- })
- if err != nil {
- return ctx, err
- }
-
- return context.WithValue(ctx, "tx", tx), nil
-}
-func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
- if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx.Commit()
- }
- return nil
-}
-func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
- if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx.Rollback()
- }
- return nil
-}
-
-func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
- if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx
- }
- return store.DB
-}
-
-func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- dir, name := entry.FullPath.DirAndName()
- meta, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encode %s: %s", entry.FullPath, err)
- }
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
- if err != nil {
- return fmt.Errorf("insert %s: %s", entry.FullPath, err)
- }
-
- affectedRows, err := res.RowsAffected()
- if err == nil && affectedRows > 0 {
- return nil
- }
-
- // now the insert failed possibly due to duplication constraints
- glog.V(1).Infof("insert %s falls back to update: %s", entry.FullPath, err)
-
- res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
- if err != nil {
- return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err)
- }
- return nil
-
-}
-
-func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- dir, name := entry.FullPath.DirAndName()
- meta, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encode %s: %s", entry.FullPath, err)
- }
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
- if err != nil {
- return fmt.Errorf("update %s: %s", entry.FullPath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
- }
- return nil
-}
-
-func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer2.Entry, error) {
-
- dir, name := fullpath.DirAndName()
- row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
- var data []byte
- if err := row.Scan(&data); err != nil {
- return nil, filer_pb.ErrNotFound
- }
-
- entry := &filer2.Entry{
- FullPath: fullpath,
- }
- if err := entry.DecodeAttributesAndChunks(data); err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
-
- dir, name := fullpath.DirAndName()
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
- if err != nil {
- return fmt.Errorf("delete %s: %s", fullpath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
- }
-
- return nil
-}
-
-func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
-
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
- if err != nil {
- return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
- }
-
- _, err = res.RowsAffected()
- if err != nil {
- return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
- }
-
- return nil
-}
-
-func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- sqlText := store.SqlListExclusive
- if inclusive {
- sqlText = store.SqlListInclusive
- }
-
- rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit)
- if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
- }
- defer rows.Close()
-
- for rows.Next() {
- var name string
- var data []byte
- if err = rows.Scan(&name, &data); err != nil {
- glog.V(0).Infof("scan %s : %v", fullpath, err)
- return nil, fmt.Errorf("scan %s: %v", fullpath, err)
- }
-
- entry := &filer2.Entry{
- FullPath: util.NewFullPath(string(fullpath), name),
- }
- if err = entry.DecodeAttributesAndChunks(data); err != nil {
- glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
- return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
- }
-
- entries = append(entries, entry)
- }
-
- return entries, nil
-}
-
-func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
-}
-
-func (store *AbstractSqlStore) Shutdown() {
- store.DB.Close()
-}
diff --git a/weed/filer2/cassandra/README.txt b/weed/filer2/cassandra/README.txt
deleted file mode 100644
index 122c9c3f4..000000000
--- a/weed/filer2/cassandra/README.txt
+++ /dev/null
@@ -1,14 +0,0 @@
-1. create a keyspace
-
-CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
-
-2. create filemeta table
-
- USE seaweedfs;
-
- CREATE TABLE filemeta (
- directory varchar,
- name varchar,
- meta blob,
- PRIMARY KEY (directory, name)
- ) WITH CLUSTERING ORDER BY (name ASC);
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
deleted file mode 100644
index 4d845c2fa..000000000
--- a/weed/filer2/cassandra/cassandra_store.go
+++ /dev/null
@@ -1,163 +0,0 @@
-package cassandra
-
-import (
- "context"
- "fmt"
- "github.com/gocql/gocql"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &CassandraStore{})
-}
-
-type CassandraStore struct {
- cluster *gocql.ClusterConfig
- session *gocql.Session
-}
-
-func (store *CassandraStore) GetName() string {
- return "cassandra"
-}
-
-func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- return store.initialize(
- configuration.GetString(prefix+"keyspace"),
- configuration.GetStringSlice(prefix+"hosts"),
- )
-}
-
-func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
- store.cluster = gocql.NewCluster(hosts...)
- store.cluster.Keyspace = keyspace
- store.cluster.Consistency = gocql.LocalQuorum
- store.session, err = store.cluster.CreateSession()
- if err != nil {
- glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
- }
- return
-}
-
-func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- dir, name := entry.FullPath.DirAndName()
- meta, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encode %s: %s", entry.FullPath, err)
- }
-
- if err := store.session.Query(
- "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
- dir, name, meta, entry.TtlSec).Exec(); err != nil {
- return fmt.Errorf("insert %s: %s", entry.FullPath, err)
- }
-
- return nil
-}
-
-func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
-
- dir, name := fullpath.DirAndName()
- var data []byte
- if err := store.session.Query(
- "SELECT meta FROM filemeta WHERE directory=? AND name=?",
- dir, name).Consistency(gocql.One).Scan(&data); err != nil {
- if err != gocql.ErrNotFound {
- return nil, filer_pb.ErrNotFound
- }
- }
-
- if len(data) == 0 {
- return nil, filer_pb.ErrNotFound
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
-
- dir, name := fullpath.DirAndName()
-
- if err := store.session.Query(
- "DELETE FROM filemeta WHERE directory=? AND name=?",
- dir, name).Exec(); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
-
- if err := store.session.Query(
- "DELETE FROM filemeta WHERE directory=?",
- fullpath).Exec(); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- return nil, filer2.ErrUnsupportedListDirectoryPrefixed
-}
-
-func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
- if inclusive {
- cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
- }
-
- var data []byte
- var name string
- iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
- for iter.Scan(&name, &data) {
- entry := &filer2.Entry{
- FullPath: util.NewFullPath(string(fullpath), name),
- }
- if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
- if err := iter.Close(); err != nil {
- glog.V(0).Infof("list iterator close: %v", err)
- }
-
- return entries, err
-}
-
-func (store *CassandraStore) Shutdown() {
- store.session.Close()
-}
diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go
deleted file mode 100644
index a174117ea..000000000
--- a/weed/filer2/configuration.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package filer2
-
-import (
- "os"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/spf13/viper"
-)
-
-var (
- Stores []FilerStore
-)
-
-func (f *Filer) LoadConfiguration(config *viper.Viper) {
-
- validateOneEnabledStore(config)
-
- for _, store := range Stores {
- if config.GetBool(store.GetName() + ".enabled") {
- if err := store.Initialize(config, store.GetName()+"."); err != nil {
- glog.Fatalf("Failed to initialize store for %s: %+v",
- store.GetName(), err)
- }
- f.SetStore(store)
- glog.V(0).Infof("Configure filer for %s", store.GetName())
- return
- }
- }
-
- println()
- println("Supported filer stores are:")
- for _, store := range Stores {
- println(" " + store.GetName())
- }
-
- os.Exit(-1)
-}
-
-func validateOneEnabledStore(config *viper.Viper) {
- enabledStore := ""
- for _, store := range Stores {
- if config.GetBool(store.GetName() + ".enabled") {
- if enabledStore == "" {
- enabledStore = store.GetName()
- } else {
- glog.Fatalf("Filer store is enabled for both %s and %s", enabledStore, store.GetName())
- }
- }
- }
-}
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
deleted file mode 100644
index fedfde40d..000000000
--- a/weed/filer2/entry.go
+++ /dev/null
@@ -1,91 +0,0 @@
-package filer2
-
-import (
- "os"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type Attr struct {
- Mtime time.Time // time of last modification
- Crtime time.Time // time of creation (OS X only)
- Mode os.FileMode // file mode
- Uid uint32 // owner uid
- Gid uint32 // group gid
- Mime string // mime type
- Replication string // replication
- Collection string // collection name
- TtlSec int32 // ttl in seconds
- UserName string
- GroupNames []string
- SymlinkTarget string
- Md5 []byte
- FileSize uint64
-}
-
-func (attr Attr) IsDirectory() bool {
- return attr.Mode&os.ModeDir > 0
-}
-
-type Entry struct {
- util.FullPath
-
- Attr
- Extended map[string][]byte
-
- // the following is for files
- Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`
-}
-
-func (entry *Entry) Size() uint64 {
- return maxUint64(TotalSize(entry.Chunks), entry.FileSize)
-}
-
-func (entry *Entry) Timestamp() time.Time {
- if entry.IsDirectory() {
- return entry.Crtime
- } else {
- return entry.Mtime
- }
-}
-
-func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
- if entry == nil {
- return nil
- }
- return &filer_pb.Entry{
- Name: entry.FullPath.Name(),
- IsDirectory: entry.IsDirectory(),
- Attributes: EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- }
-}
-
-func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
- if entry == nil {
- return nil
- }
- dir, _ := entry.FullPath.DirAndName()
- return &filer_pb.FullEntry{
- Dir: dir,
- Entry: entry.ToProtoEntry(),
- }
-}
-
-func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry {
- return &Entry{
- FullPath: util.NewFullPath(dir, entry.Name),
- Attr: PbToEntryAttribute(entry.Attributes),
- Chunks: entry.Chunks,
- }
-}
-
-func maxUint64(x, y uint64) uint64 {
- if x > y {
- return x
- }
- return y
-}
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
deleted file mode 100644
index 4d615194f..000000000
--- a/weed/filer2/entry_codec.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package filer2
-
-import (
- "bytes"
- "fmt"
- "os"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
- message := &filer_pb.Entry{
- Attributes: EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- }
- return proto.Marshal(message)
-}
-
-func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
-
- message := &filer_pb.Entry{}
-
- if err := proto.UnmarshalMerge(blob, message); err != nil {
- return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err)
- }
-
- entry.Attr = PbToEntryAttribute(message.Attributes)
-
- entry.Extended = message.Extended
-
- entry.Chunks = message.Chunks
-
- return nil
-}
-
-func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
-
- return &filer_pb.FuseAttributes{
- Crtime: entry.Attr.Crtime.Unix(),
- Mtime: entry.Attr.Mtime.Unix(),
- FileMode: uint32(entry.Attr.Mode),
- Uid: entry.Uid,
- Gid: entry.Gid,
- Mime: entry.Mime,
- Collection: entry.Attr.Collection,
- Replication: entry.Attr.Replication,
- TtlSec: entry.Attr.TtlSec,
- UserName: entry.Attr.UserName,
- GroupName: entry.Attr.GroupNames,
- SymlinkTarget: entry.Attr.SymlinkTarget,
- Md5: entry.Attr.Md5,
- FileSize: entry.Attr.FileSize,
- }
-}
-
-func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
-
- t := Attr{}
-
- t.Crtime = time.Unix(attr.Crtime, 0)
- t.Mtime = time.Unix(attr.Mtime, 0)
- t.Mode = os.FileMode(attr.FileMode)
- t.Uid = attr.Uid
- t.Gid = attr.Gid
- t.Mime = attr.Mime
- t.Collection = attr.Collection
- t.Replication = attr.Replication
- t.TtlSec = attr.TtlSec
- t.UserName = attr.UserName
- t.GroupNames = attr.GroupName
- t.SymlinkTarget = attr.SymlinkTarget
- t.Md5 = attr.Md5
- t.FileSize = attr.FileSize
-
- return t
-}
-
-func EqualEntry(a, b *Entry) bool {
- if a == b {
- return true
- }
- if a == nil && b != nil || a != nil && b == nil {
- return false
- }
- if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) {
- return false
- }
- if len(a.Chunks) != len(b.Chunks) {
- return false
- }
-
- if !eq(a.Extended, b.Extended) {
- return false
- }
-
- if !bytes.Equal(a.Md5, b.Md5) {
- return false
- }
-
- for i := 0; i < len(a.Chunks); i++ {
- if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
- return false
- }
- }
- return true
-}
-
-func eq(a, b map[string][]byte) bool {
- if len(a) != len(b) {
- return false
- }
-
- for k, v := range a {
- if w, ok := b[k]; !ok || !bytes.Equal(v, w) {
- return false
- }
- }
-
- return true
-}
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
deleted file mode 100644
index 6f4c3ce5c..000000000
--- a/weed/filer2/etcd/etcd_store.go
+++ /dev/null
@@ -1,204 +0,0 @@
-package etcd
-
-import (
- "context"
- "fmt"
- "strings"
- "time"
-
- "go.etcd.io/etcd/clientv3"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-const (
- DIR_FILE_SEPARATOR = byte(0x00)
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &EtcdStore{})
-}
-
-type EtcdStore struct {
- client *clientv3.Client
-}
-
-func (store *EtcdStore) GetName() string {
- return "etcd"
-}
-
-func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
- servers := configuration.GetString(prefix + "servers")
- if servers == "" {
- servers = "localhost:2379"
- }
-
- timeout := configuration.GetString(prefix + "timeout")
- if timeout == "" {
- timeout = "3s"
- }
-
- return store.initialize(servers, timeout)
-}
-
-func (store *EtcdStore) initialize(servers string, timeout string) (err error) {
- glog.Infof("filer store etcd: %s", servers)
-
- to, err := time.ParseDuration(timeout)
- if err != nil {
- return fmt.Errorf("parse timeout %s: %s", timeout, err)
- }
-
- store.client, err = clientv3.New(clientv3.Config{
- Endpoints: strings.Split(servers, ","),
- DialTimeout: to,
- })
- if err != nil {
- return fmt.Errorf("connect to etcd %s: %s", servers, err)
- }
-
- return
-}
-
-func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *EtcdStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- key := genKey(entry.DirAndName())
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- return nil
-}
-
-func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
- key := genKey(fullpath.DirAndName())
-
- resp, err := store.client.Get(ctx, string(key))
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- if len(resp.Kvs) == 0 {
- return nil, filer_pb.ErrNotFound
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- key := genKey(fullpath.DirAndName())
-
- if _, err := store.client.Delete(ctx, string(key)); err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
- return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- return nil, filer2.ErrUnsupportedListDirectoryPrefixed
-}
-
-func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- resp, err := store.client.Get(ctx, string(directoryPrefix),
- clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
- if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
- }
-
- for _, kv := range resp.Kvs {
- fileName := getNameFromKey(kv.Key)
- if fileName == "" {
- continue
- }
- if fileName == startFileName && !inclusive {
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
- }
- if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string) (key []byte) {
- key = []byte(dirPath)
- key = append(key, DIR_FILE_SEPARATOR)
- key = append(key, []byte(fileName)...)
- return key
-}
-
-func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
- keyPrefix = []byte(string(fullpath))
- keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix
-}
-
-func getNameFromKey(key []byte) string {
- sepIndex := len(key) - 1
- for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
- sepIndex--
- }
-
- return string(key[sepIndex+1:])
-}
-
-func (store *EtcdStore) Shutdown() {
- store.client.Close()
-}
diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go
deleted file mode 100644
index ba4625bab..000000000
--- a/weed/filer2/filechunk_manifest.go
+++ /dev/null
@@ -1,168 +0,0 @@
-package filer2
-
-import (
- "bytes"
- "fmt"
- "io"
- "math"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-const (
- ManifestBatch = 1000
-)
-
-func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
- for _, chunk := range chunks {
- if chunk.IsChunkManifest {
- return true
- }
- }
- return false
-}
-
-func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
- for _, c := range chunks {
- if c.IsChunkManifest {
- manifestChunks = append(manifestChunks, c)
- } else {
- nonManifestChunks = append(nonManifestChunks, c)
- }
- }
- return
-}
-
-func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
- // TODO maybe parallel this
- for _, chunk := range chunks {
- if !chunk.IsChunkManifest {
- dataChunks = append(dataChunks, chunk)
- continue
- }
-
- resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
- if err != nil {
- return chunks, nil, err
- }
-
- manifestChunks = append(manifestChunks, chunk)
- // recursive
- dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
- if subErr != nil {
- return chunks, nil, subErr
- }
- dataChunks = append(dataChunks, dchunks...)
- manifestChunks = append(manifestChunks, mchunks...)
- }
- return
-}
-
-func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
- if !chunk.IsChunkManifest {
- return
- }
-
- // IsChunkManifest
- data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
- if err != nil {
- return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
- }
- m := &filer_pb.FileChunkManifest{}
- if err := proto.Unmarshal(data, m); err != nil {
- return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
- }
-
- // recursive
- filer_pb.AfterEntryDeserialization(m.Chunks)
- return m.Chunks, nil
-}
-
-// TODO fetch from cache for weed mount?
-func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
- urlString, err := lookupFileIdFn(fileId)
- if err != nil {
- glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return nil, err
- }
- var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) {
- buffer.Write(data)
- })
- if err != nil {
- glog.V(0).Infof("read %s failed, err: %v", fileId, err)
- return nil, err
- }
-
- return buffer.Bytes(), nil
-}
-
-func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
- return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
-}
-
-func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
-
- var dataChunks []*filer_pb.FileChunk
- for _, chunk := range inputChunks {
- if !chunk.IsChunkManifest {
- dataChunks = append(dataChunks, chunk)
- } else {
- chunks = append(chunks, chunk)
- }
- }
-
- remaining := len(dataChunks)
- for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
- chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
- if err != nil {
- return dataChunks, err
- }
- chunks = append(chunks, chunk)
- remaining -= mergeFactor
- }
- // remaining
- for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
- chunks = append(chunks, dataChunks[i])
- }
- return
-}
-
-func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
-
- filer_pb.BeforeEntrySerialization(dataChunks)
-
- // create and serialize the manifest
- data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
- Chunks: dataChunks,
- })
- if serErr != nil {
- return nil, fmt.Errorf("serializing manifest: %v", serErr)
- }
-
- minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
- for _, chunk := range dataChunks {
- if minOffset > int64(chunk.Offset) {
- minOffset = chunk.Offset
- }
- if maxOffset < int64(chunk.Size)+chunk.Offset {
- maxOffset = int64(chunk.Size) + chunk.Offset
- }
- }
-
- manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
- if err != nil {
- return nil, err
- }
- manifestChunk.IsChunkManifest = true
- manifestChunk.Offset = minOffset
- manifestChunk.Size = uint64(maxOffset - minOffset)
-
- return
-}
-
-type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)
diff --git a/weed/filer2/filechunk_manifest_test.go b/weed/filer2/filechunk_manifest_test.go
deleted file mode 100644
index 2b0862d07..000000000
--- a/weed/filer2/filechunk_manifest_test.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package filer2
-
-import (
- "bytes"
- "math"
- "testing"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func TestDoMaybeManifestize(t *testing.T) {
- var manifestTests = []struct {
- inputs []*filer_pb.FileChunk
- expected []*filer_pb.FileChunk
- }{
- {
- inputs: []*filer_pb.FileChunk{
- {FileId: "1", IsChunkManifest: false},
- {FileId: "2", IsChunkManifest: false},
- {FileId: "3", IsChunkManifest: false},
- {FileId: "4", IsChunkManifest: false},
- },
- expected: []*filer_pb.FileChunk{
- {FileId: "12", IsChunkManifest: true},
- {FileId: "34", IsChunkManifest: true},
- },
- },
- {
- inputs: []*filer_pb.FileChunk{
- {FileId: "1", IsChunkManifest: true},
- {FileId: "2", IsChunkManifest: false},
- {FileId: "3", IsChunkManifest: false},
- {FileId: "4", IsChunkManifest: false},
- },
- expected: []*filer_pb.FileChunk{
- {FileId: "1", IsChunkManifest: true},
- {FileId: "23", IsChunkManifest: true},
- {FileId: "4", IsChunkManifest: false},
- },
- },
- {
- inputs: []*filer_pb.FileChunk{
- {FileId: "1", IsChunkManifest: false},
- {FileId: "2", IsChunkManifest: true},
- {FileId: "3", IsChunkManifest: false},
- {FileId: "4", IsChunkManifest: false},
- },
- expected: []*filer_pb.FileChunk{
- {FileId: "2", IsChunkManifest: true},
- {FileId: "13", IsChunkManifest: true},
- {FileId: "4", IsChunkManifest: false},
- },
- },
- {
- inputs: []*filer_pb.FileChunk{
- {FileId: "1", IsChunkManifest: true},
- {FileId: "2", IsChunkManifest: true},
- {FileId: "3", IsChunkManifest: false},
- {FileId: "4", IsChunkManifest: false},
- },
- expected: []*filer_pb.FileChunk{
- {FileId: "1", IsChunkManifest: true},
- {FileId: "2", IsChunkManifest: true},
- {FileId: "34", IsChunkManifest: true},
- },
- },
- }
-
- for i, mtest := range manifestTests {
- println("test", i)
- actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge)
- assertEqualChunks(t, mtest.expected, actual)
- }
-
-}
-
-func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) {
- assert.Equal(t, len(expected), len(actual))
- for i := 0; i < len(actual); i++ {
- assertEqualChunk(t, actual[i], expected[i])
- }
-}
-func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) {
- assert.Equal(t, expected.FileId, actual.FileId)
- assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest)
-}
-
-func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
-
- var buf bytes.Buffer
- minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
- for k := 0; k < len(dataChunks); k++ {
- chunk := dataChunks[k]
- buf.WriteString(chunk.FileId)
- if minOffset > int64(chunk.Offset) {
- minOffset = chunk.Offset
- }
- if maxOffset < int64(chunk.Size)+chunk.Offset {
- maxOffset = int64(chunk.Size) + chunk.Offset
- }
- }
-
- manifestChunk = &filer_pb.FileChunk{
- FileId: buf.String(),
- }
- manifestChunk.IsChunkManifest = true
- manifestChunk.Offset = minOffset
- manifestChunk.Size = uint64(maxOffset - minOffset)
-
- return
-}
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
deleted file mode 100644
index 53c679d6b..000000000
--- a/weed/filer2/filechunks.go
+++ /dev/null
@@ -1,284 +0,0 @@
-package filer2
-
-import (
- "fmt"
- "hash/fnv"
- "math"
- "sort"
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
- for _, c := range chunks {
- t := uint64(c.Offset + int64(c.Size))
- if size < t {
- size = t
- }
- }
- return
-}
-
-func FileSize(entry *filer_pb.Entry) (size uint64) {
- return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
-}
-
-func ETag(entry *filer_pb.Entry) (etag string) {
- if entry.Attributes == nil || entry.Attributes.Md5 == nil {
- return ETagChunks(entry.Chunks)
- }
- return fmt.Sprintf("%x", entry.Attributes.Md5)
-}
-
-func ETagEntry(entry *Entry) (etag string) {
- if entry.Attr.Md5 == nil {
- return ETagChunks(entry.Chunks)
- }
- return fmt.Sprintf("%x", entry.Attr.Md5)
-}
-
-func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
- if len(chunks) == 1 {
- return chunks[0].ETag
- }
-
- h := fnv.New32a()
- for _, c := range chunks {
- h.Write([]byte(c.ETag))
- }
- return fmt.Sprintf("%x", h.Sum32())
-}
-
-func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
-
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
-
- fileIds := make(map[string]bool)
- for _, interval := range visibles {
- fileIds[interval.fileId] = true
- }
- for _, chunk := range chunks {
- if _, found := fileIds[chunk.GetFileIdString()]; found {
- compacted = append(compacted, chunk)
- } else {
- garbage = append(garbage, chunk)
- }
- }
-
- return
-}
-
-func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
-
- aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
- if aErr != nil {
- return nil, aErr
- }
- bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs)
- if bErr != nil {
- return nil, bErr
- }
-
- delta = append(delta, DoMinusChunks(aData, bData)...)
- delta = append(delta, DoMinusChunks(aMeta, bMeta)...)
- return
-}
-
-func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
-
- fileIds := make(map[string]bool)
- for _, interval := range bs {
- fileIds[interval.GetFileIdString()] = true
- }
- for _, chunk := range as {
- if _, found := fileIds[chunk.GetFileIdString()]; !found {
- delta = append(delta, chunk)
- }
- }
-
- return
-}
-
-type ChunkView struct {
- FileId string
- Offset int64
- Size uint64
- LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
- ChunkSize uint64
- CipherKey []byte
- IsGzipped bool
-}
-
-func (cv *ChunkView) IsFullChunk() bool {
- return cv.Size == cv.ChunkSize
-}
-
-func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
-
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
-
- return ViewFromVisibleIntervals(visibles, offset, size)
-
-}
-
-func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int64) (views []*ChunkView) {
-
- stop := offset + size
- if size == math.MaxInt64 {
- stop = math.MaxInt64
- }
- if stop < offset {
- stop = math.MaxInt64
- }
-
- for _, chunk := range visibles {
-
- chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop)
-
- if chunkStart < chunkStop {
- views = append(views, &ChunkView{
- FileId: chunk.fileId,
- Offset: chunkStart - chunk.start + chunk.chunkOffset,
- Size: uint64(chunkStop - chunkStart),
- LogicOffset: chunkStart,
- ChunkSize: chunk.chunkSize,
- CipherKey: chunk.cipherKey,
- IsGzipped: chunk.isGzipped,
- })
- }
- }
-
- return views
-
-}
-
-func logPrintf(name string, visibles []VisibleInterval) {
-
- /*
- glog.V(0).Infof("%s len %d", name, len(visibles))
- for _, v := range visibles {
- glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset)
- }
- */
-}
-
-var bufPool = sync.Pool{
- New: func() interface{} {
- return new(VisibleInterval)
- },
-}
-
-func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) {
-
- newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed)
-
- length := len(visibles)
- if length == 0 {
- return append(visibles, newV)
- }
- last := visibles[length-1]
- if last.stop <= chunk.Offset {
- return append(visibles, newV)
- }
-
- logPrintf(" before", visibles)
- // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size)
- chunkStop := chunk.Offset + int64(chunk.Size)
- for _, v := range visibles {
- if v.start < chunk.Offset && chunk.Offset < v.stop {
- t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)
- newVisibles = append(newVisibles, t)
- // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
- }
- if v.start < chunkStop && chunkStop < v.stop {
- t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)
- newVisibles = append(newVisibles, t)
- // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
- }
- if chunkStop <= v.start || v.stop <= chunk.Offset {
- newVisibles = append(newVisibles, v)
- // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop)
- }
- }
- newVisibles = append(newVisibles, newV)
-
- logPrintf(" append", newVisibles)
-
- for i := len(newVisibles) - 1; i >= 0; i-- {
- if i > 0 && newV.start < newVisibles[i-1].start {
- newVisibles[i] = newVisibles[i-1]
- } else {
- newVisibles[i] = newV
- break
- }
- }
- logPrintf(" sorted", newVisibles)
-
- return newVisibles
-}
-
-// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
-// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
-
- chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
-
- sort.Slice(chunks, func(i, j int) bool {
- if chunks[i].Mtime == chunks[j].Mtime {
- return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
- }
- return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run
- })
-
- for _, chunk := range chunks {
-
- // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
- visibles = MergeIntoVisibles(visibles, chunk)
-
- logPrintf("add", visibles)
-
- }
-
- return
-}
-
-// find non-overlapping visible intervals
-// visible interval map to one file chunk
-
-type VisibleInterval struct {
- start int64
- stop int64
- modifiedTime int64
- fileId string
- chunkOffset int64
- chunkSize uint64
- cipherKey []byte
- isGzipped bool
-}
-
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
- return VisibleInterval{
- start: start,
- stop: stop,
- fileId: fileId,
- modifiedTime: modifiedTime,
- chunkOffset: chunkOffset, // the starting position in the chunk
- chunkSize: chunkSize,
- cipherKey: cipherKey,
- isGzipped: isGzipped,
- }
-}
-
-func min(x, y int64) int64 {
- if x <= y {
- return x
- }
- return y
-}
-func max(x, y int64) int64 {
- if x <= y {
- return y
- }
- return x
-}
diff --git a/weed/filer2/filechunks2_test.go b/weed/filer2/filechunks2_test.go
deleted file mode 100644
index d896da3cc..000000000
--- a/weed/filer2/filechunks2_test.go
+++ /dev/null
@@ -1,46 +0,0 @@
-package filer2
-
-import (
- "sort"
- "testing"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func TestCompactFileChunksRealCase(t *testing.T) {
-
- chunks := []*filer_pb.FileChunk{
- {FileId: "2,512f31f2c0700a", Offset: 0, Size: 25 - 0, Mtime: 5320497},
- {FileId: "6,512f2c2e24e9e8", Offset: 868352, Size: 917585 - 868352, Mtime: 5320492},
- {FileId: "7,514468dd5954ca", Offset: 884736, Size: 901120 - 884736, Mtime: 5325928},
- {FileId: "5,5144463173fe77", Offset: 917504, Size: 2297856 - 917504, Mtime: 5325894},
- {FileId: "4,51444c7ab54e2d", Offset: 2301952, Size: 2367488 - 2301952, Mtime: 5325900},
- {FileId: "4,514450e643ad22", Offset: 2371584, Size: 2420736 - 2371584, Mtime: 5325904},
- {FileId: "6,514456a5e9e4d7", Offset: 2449408, Size: 2490368 - 2449408, Mtime: 5325910},
- {FileId: "3,51444f8d53eebe", Offset: 2494464, Size: 2555904 - 2494464, Mtime: 5325903},
- {FileId: "4,5144578b097c7e", Offset: 2560000, Size: 2596864 - 2560000, Mtime: 5325911},
- {FileId: "3,51445500b6b4ac", Offset: 2637824, Size: 2678784 - 2637824, Mtime: 5325909},
- {FileId: "1,51446285e52a61", Offset: 2695168, Size: 2715648 - 2695168, Mtime: 5325922},
- }
-
- printChunks("before", chunks)
-
- compacted, garbage := CompactFileChunks(nil, chunks)
-
- printChunks("compacted", compacted)
- printChunks("garbage", garbage)
-
-}
-
-func printChunks(name string, chunks []*filer_pb.FileChunk) {
- sort.Slice(chunks, func(i, j int) bool {
- if chunks[i].Offset == chunks[j].Offset {
- return chunks[i].Mtime < chunks[j].Mtime
- }
- return chunks[i].Offset < chunks[j].Offset
- })
- for _, chunk := range chunks {
- glog.V(0).Infof("%s chunk %s [%10d,%10d)", name, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
-}
diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go
deleted file mode 100644
index 31b74a22a..000000000
--- a/weed/filer2/filechunks_test.go
+++ /dev/null
@@ -1,539 +0,0 @@
-package filer2
-
-import (
- "fmt"
- "log"
- "math"
- "math/rand"
- "strconv"
- "testing"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func TestCompactFileChunks(t *testing.T) {
- chunks := []*filer_pb.FileChunk{
- {Offset: 10, Size: 100, FileId: "abc", Mtime: 50},
- {Offset: 100, Size: 100, FileId: "def", Mtime: 100},
- {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200},
- {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300},
- }
-
- compacted, garbage := CompactFileChunks(nil, chunks)
-
- if len(compacted) != 3 {
- t.Fatalf("unexpected compacted: %d", len(compacted))
- }
- if len(garbage) != 1 {
- t.Fatalf("unexpected garbage: %d", len(garbage))
- }
-
-}
-
-func TestCompactFileChunks2(t *testing.T) {
-
- chunks := []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 50},
- {Offset: 100, Size: 100, FileId: "def", Mtime: 100},
- {Offset: 200, Size: 100, FileId: "ghi", Mtime: 200},
- {Offset: 0, Size: 100, FileId: "abcf", Mtime: 300},
- {Offset: 50, Size: 100, FileId: "fhfh", Mtime: 400},
- {Offset: 100, Size: 100, FileId: "yuyu", Mtime: 500},
- }
-
- k := 3
-
- for n := 0; n < k; n++ {
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n),
- })
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k),
- })
- }
-
- compacted, garbage := CompactFileChunks(nil, chunks)
-
- if len(compacted) != 4 {
- t.Fatalf("unexpected compacted: %d", len(compacted))
- }
- if len(garbage) != 8 {
- t.Fatalf("unexpected garbage: %d", len(garbage))
- }
-}
-
-func TestRandomFileChunksCompact(t *testing.T) {
-
- data := make([]byte, 1024)
-
- var chunks []*filer_pb.FileChunk
- for i := 0; i < 15; i++ {
- start, stop := rand.Intn(len(data)), rand.Intn(len(data))
- if start > stop {
- start, stop = stop, start
- }
- if start+16 < stop {
- stop = start + 16
- }
- chunk := &filer_pb.FileChunk{
- FileId: strconv.Itoa(i),
- Offset: int64(start),
- Size: uint64(stop - start),
- Mtime: int64(i),
- Fid: &filer_pb.FileId{FileKey: uint64(i)},
- }
- chunks = append(chunks, chunk)
- for x := start; x < stop; x++ {
- data[x] = byte(i)
- }
- }
-
- visibles, _ := NonOverlappingVisibleIntervals(nil, chunks)
-
- for _, v := range visibles {
- for x := v.start; x < v.stop; x++ {
- assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId)
- }
- }
-
-}
-
-func TestIntervalMerging(t *testing.T) {
-
- testcases := []struct {
- Chunks []*filer_pb.FileChunk
- Expected []*VisibleInterval
- }{
- // case 0: normal
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
- {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 100, fileId: "abc"},
- {start: 100, stop: 200, fileId: "asdf"},
- {start: 200, stop: 300, fileId: "fsad"},
- },
- },
- // case 1: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 200, fileId: "asdf"},
- },
- },
- // case 2: updates overwrite part of previous chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
- {Offset: 0, Size: 70, FileId: "b", Mtime: 134},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 70, fileId: "b"},
- {start: 70, stop: 100, fileId: "a", chunkOffset: 70},
- },
- },
- // case 3: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 50, fileId: "asdf"},
- {start: 50, stop: 300, fileId: "xxxx"},
- },
- },
- // case 4: updates far away from prev chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 200, fileId: "asdf"},
- {start: 250, stop: 500, fileId: "xxxx"},
- },
- },
- // case 5: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "d", Mtime: 184},
- {Offset: 70, Size: 150, FileId: "c", Mtime: 143},
- {Offset: 80, Size: 100, FileId: "b", Mtime: 134},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 200, fileId: "d"},
- {start: 200, stop: 220, fileId: "c", chunkOffset: 130},
- },
- },
- // case 6: same updates
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123},
- {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123},
- {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 100, fileId: "xyz"},
- },
- },
- // case 7: real updates
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 2097152, FileId: "7,0294cbb9892b", Mtime: 123},
- {Offset: 0, Size: 3145728, FileId: "3,029565bf3092", Mtime: 130},
- {Offset: 2097152, Size: 3145728, FileId: "6,029632f47ae2", Mtime: 140},
- {Offset: 5242880, Size: 3145728, FileId: "2,029734c5aa10", Mtime: 150},
- {Offset: 8388608, Size: 3145728, FileId: "5,02982f80de50", Mtime: 160},
- {Offset: 11534336, Size: 2842193, FileId: "7,0299ad723803", Mtime: 170},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 2097152, fileId: "3,029565bf3092"},
- {start: 2097152, stop: 5242880, fileId: "6,029632f47ae2"},
- {start: 5242880, stop: 8388608, fileId: "2,029734c5aa10"},
- {start: 8388608, stop: 11534336, fileId: "5,02982f80de50"},
- {start: 11534336, stop: 14376529, fileId: "7,0299ad723803"},
- },
- },
- // case 8: real bug
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 77824, FileId: "4,0b3df938e301", Mtime: 123},
- {Offset: 471040, Size: 472225 - 471040, FileId: "6,0b3e0650019c", Mtime: 130},
- {Offset: 77824, Size: 208896 - 77824, FileId: "4,0b3f0c7202f0", Mtime: 140},
- {Offset: 208896, Size: 339968 - 208896, FileId: "2,0b4031a72689", Mtime: 150},
- {Offset: 339968, Size: 471040 - 339968, FileId: "3,0b416a557362", Mtime: 160},
- },
- Expected: []*VisibleInterval{
- {start: 0, stop: 77824, fileId: "4,0b3df938e301"},
- {start: 77824, stop: 208896, fileId: "4,0b3f0c7202f0"},
- {start: 208896, stop: 339968, fileId: "2,0b4031a72689"},
- {start: 339968, stop: 471040, fileId: "3,0b416a557362"},
- {start: 471040, stop: 472225, fileId: "6,0b3e0650019c"},
- },
- },
- }
-
- for i, testcase := range testcases {
- log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
- intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks)
- for x, interval := range intervals {
- log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
- i, x, interval.start, interval.stop, interval.fileId)
- }
- for x, interval := range intervals {
- if interval.start != testcase.Expected[x].start {
- t.Fatalf("failed on test case %d, interval %d, start %d, expect %d",
- i, x, interval.start, testcase.Expected[x].start)
- }
- if interval.stop != testcase.Expected[x].stop {
- t.Fatalf("failed on test case %d, interval %d, stop %d, expect %d",
- i, x, interval.stop, testcase.Expected[x].stop)
- }
- if interval.fileId != testcase.Expected[x].fileId {
- t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s",
- i, x, interval.fileId, testcase.Expected[x].fileId)
- }
- if interval.chunkOffset != testcase.Expected[x].chunkOffset {
- t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d",
- i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset)
- }
- }
- if len(intervals) != len(testcase.Expected) {
- t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
- }
-
- }
-
-}
-
-func TestChunksReading(t *testing.T) {
-
- testcases := []struct {
- Chunks []*filer_pb.FileChunk
- Offset int64
- Size int64
- Expected []*ChunkView
- }{
- // case 0: normal
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
- {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
- },
- Offset: 0,
- Size: 250,
- Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
- {Offset: 0, Size: 50, FileId: "fsad", LogicOffset: 200},
- },
- },
- // case 1: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- },
- Offset: 50,
- Size: 100,
- Expected: []*ChunkView{
- {Offset: 50, Size: 100, FileId: "asdf", LogicOffset: 50},
- },
- },
- // case 2: updates overwrite part of previous chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 3, Size: 100, FileId: "a", Mtime: 123},
- {Offset: 10, Size: 50, FileId: "b", Mtime: 134},
- },
- Offset: 30,
- Size: 40,
- Expected: []*ChunkView{
- {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30},
- {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60},
- },
- },
- // case 3: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 50, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Offset: 0,
- Size: 200,
- Expected: []*ChunkView{
- {Offset: 0, Size: 50, FileId: "asdf", LogicOffset: 0},
- {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 50},
- },
- },
- // case 4: updates far away from prev chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 250, Size: 250, FileId: "xxxx", Mtime: 154},
- },
- Offset: 0,
- Size: 400,
- Expected: []*ChunkView{
- {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
- {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250},
- },
- },
- // case 5: updates overwrite full chunks
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "c", Mtime: 184},
- {Offset: 70, Size: 150, FileId: "b", Mtime: 143},
- {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134},
- },
- Offset: 0,
- Size: 220,
- Expected: []*ChunkView{
- {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0},
- {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200},
- },
- },
- // case 6: same updates
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123},
- {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123},
- {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123},
- },
- Offset: 0,
- Size: 100,
- Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0},
- },
- },
- // case 7: edge cases
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 100, Size: 100, FileId: "asdf", Mtime: 134},
- {Offset: 200, Size: 100, FileId: "fsad", Mtime: 353},
- },
- Offset: 0,
- Size: 200,
- Expected: []*ChunkView{
- {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 100},
- },
- },
- // case 8: edge cases
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 90, Size: 200, FileId: "asdf", Mtime: 134},
- {Offset: 190, Size: 300, FileId: "fsad", Mtime: 353},
- },
- Offset: 0,
- Size: 300,
- Expected: []*ChunkView{
- {Offset: 0, Size: 90, FileId: "abc", LogicOffset: 0},
- {Offset: 0, Size: 100, FileId: "asdf", LogicOffset: 90},
- {Offset: 0, Size: 110, FileId: "fsad", LogicOffset: 190},
- },
- },
- // case 9: edge cases
- {
- Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 43175947, FileId: "2,111fc2cbfac1", Mtime: 1},
- {Offset: 43175936, Size: 52981771 - 43175936, FileId: "2,112a36ea7f85", Mtime: 2},
- {Offset: 52981760, Size: 72564747 - 52981760, FileId: "4,112d5f31c5e7", Mtime: 3},
- {Offset: 72564736, Size: 133255179 - 72564736, FileId: "1,113245f0cdb6", Mtime: 4},
- {Offset: 133255168, Size: 137269259 - 133255168, FileId: "3,1141a70733b5", Mtime: 5},
- {Offset: 137269248, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", Mtime: 6},
- },
- Offset: 0,
- Size: 153578836,
- Expected: []*ChunkView{
- {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0},
- {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936},
- {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760},
- {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736},
- {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168},
- {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248},
- },
- },
- }
-
- for i, testcase := range testcases {
- if i != 2 {
- // continue
- }
- log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
- chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
- for x, chunk := range chunks {
- log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
- i, x, chunk.Offset, chunk.Size, chunk.FileId)
- if chunk.Offset != testcase.Expected[x].Offset {
- t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d",
- i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset)
- }
- if chunk.Size != testcase.Expected[x].Size {
- t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d",
- i, chunk.FileId, chunk.Size, testcase.Expected[x].Size)
- }
- if chunk.FileId != testcase.Expected[x].FileId {
- t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s",
- i, x, chunk.FileId, testcase.Expected[x].FileId)
- }
- if chunk.LogicOffset != testcase.Expected[x].LogicOffset {
- t.Fatalf("failed on read case %d, chunk %d, LogicOffset %d, expect %d",
- i, x, chunk.LogicOffset, testcase.Expected[x].LogicOffset)
- }
- }
- if len(chunks) != len(testcase.Expected) {
- t.Fatalf("failed to read test case %d, len %d expected %d", i, len(chunks), len(testcase.Expected))
- }
- }
-
-}
-
-func BenchmarkCompactFileChunks(b *testing.B) {
-
- var chunks []*filer_pb.FileChunk
-
- k := 1024
-
- for n := 0; n < k; n++ {
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 100), Size: 100, FileId: fmt.Sprintf("fileId%d", n), Mtime: int64(n),
- })
- chunks = append(chunks, &filer_pb.FileChunk{
- Offset: int64(n * 50), Size: 100, FileId: fmt.Sprintf("fileId%d", n+k), Mtime: int64(n + k),
- })
- }
-
- for n := 0; n < b.N; n++ {
- CompactFileChunks(nil, chunks)
- }
-}
-
-func TestViewFromVisibleIntervals(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 0,
- stop: 25,
- fileId: "fid1",
- },
- {
- start: 4096,
- stop: 8192,
- fileId: "fid2",
- },
- {
- start: 16384,
- stop: 18551,
- fileId: "fid3",
- },
- }
-
- views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
-
- if len(views) != len(visibles) {
- assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
- }
-
-}
-
-func TestViewFromVisibleIntervals2(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 344064,
- stop: 348160,
- fileId: "fid1",
- },
- {
- start: 348160,
- stop: 356352,
- fileId: "fid2",
- },
- }
-
- views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
-
- if len(views) != len(visibles) {
- assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
- }
-
-}
-
-func TestViewFromVisibleIntervals3(t *testing.T) {
- visibles := []VisibleInterval{
- {
- start: 1000,
- stop: 2000,
- fileId: "fid1",
- },
- {
- start: 3000,
- stop: 4000,
- fileId: "fid2",
- },
- }
-
- views := ViewFromVisibleIntervals(visibles, 1700, 1500)
-
- if len(views) != len(visibles) {
- assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
- }
-
-}
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
deleted file mode 100644
index 78e27ed08..000000000
--- a/weed/filer2/filer.go
+++ /dev/null
@@ -1,290 +0,0 @@
-package filer2
-
-import (
- "context"
- "errors"
- "fmt"
- "os"
- "strings"
- "time"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-const (
- LogFlushInterval = time.Minute
- PaginationSize = 1024 * 256
-)
-
-var (
- OS_UID = uint32(os.Getuid())
- OS_GID = uint32(os.Getgid())
- ErrUnsupportedListDirectoryPrefixed = errors.New("UNSUPPORTED")
-)
-
-type Filer struct {
- Store *FilerStoreWrapper
- MasterClient *wdclient.MasterClient
- fileIdDeletionQueue *util.UnboundedQueue
- GrpcDialOption grpc.DialOption
- DirBucketsPath string
- FsyncBuckets []string
- buckets *FilerBuckets
- Cipher bool
- LocalMetaLogBuffer *log_buffer.LogBuffer
- metaLogCollection string
- metaLogReplication string
- MetaAggregator *MetaAggregator
- Signature int32
-}
-
-func NewFiler(masters []string, grpcDialOption grpc.DialOption,
- filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
- f := &Filer{
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
- fileIdDeletionQueue: util.NewUnboundedQueue(),
- GrpcDialOption: grpcDialOption,
- Signature: util.RandomInt32(),
- }
- f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
- f.metaLogCollection = collection
- f.metaLogReplication = replication
-
- go f.loopProcessingDeletion()
-
- return f
-}
-
-func (f *Filer) AggregateFromPeers(self string, filers []string) {
-
- // set peers
- if len(filers) == 0 {
- filers = append(filers, self)
- }
- f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
- f.MetaAggregator.StartLoopSubscribe(f, self)
-
-}
-
-func (f *Filer) SetStore(store FilerStore) {
- f.Store = NewFilerStoreWrapper(store)
-}
-
-func (f *Filer) GetStore() (store FilerStore) {
- return f.Store
-}
-
-func (fs *Filer) GetMaster() string {
- return fs.MasterClient.GetMaster()
-}
-
-func (fs *Filer) KeepConnectedToMaster() {
- fs.MasterClient.KeepConnectedToMaster()
-}
-
-func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
- return f.Store.BeginTransaction(ctx)
-}
-
-func (f *Filer) CommitTransaction(ctx context.Context) error {
- return f.Store.CommitTransaction(ctx)
-}
-
-func (f *Filer) RollbackTransaction(ctx context.Context) error {
- return f.Store.RollbackTransaction(ctx)
-}
-
-func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
-
- if string(entry.FullPath) == "/" {
- return nil
- }
-
- dirParts := strings.Split(string(entry.FullPath), "/")
-
- // fmt.Printf("directory parts: %+v\n", dirParts)
-
- var lastDirectoryEntry *Entry
-
- for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + util.Join(dirParts[:i]...)
- // fmt.Printf("%d directory: %+v\n", i, dirPath)
-
- // check the store directly
- glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
-
- // no such existing directory
- if dirEntry == nil {
-
- // create the directory
- now := time.Now()
-
- dirEntry = &Entry{
- FullPath: util.FullPath(dirPath),
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0110,
- Uid: entry.Uid,
- Gid: entry.Gid,
- Collection: entry.Collection,
- Replication: entry.Replication,
- UserName: entry.UserName,
- GroupNames: entry.GroupNames,
- },
- }
-
- glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
- if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
- glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
- return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
- }
- } else {
- f.maybeAddBucket(dirEntry)
- f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
- }
-
- } else if !dirEntry.IsDirectory() {
- glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
- return fmt.Errorf("%s is a file", dirPath)
- }
-
- // remember the direct parent directory entry
- if i == len(dirParts)-1 {
- lastDirectoryEntry = dirEntry
- }
-
- }
-
- if lastDirectoryEntry == nil {
- glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
- return fmt.Errorf("parent folder not found: %v", entry.FullPath)
- }
-
- /*
- if !hasWritePermission(lastDirectoryEntry, entry) {
- glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
- lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
- return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
- }
- */
-
- oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
-
- glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl)
- if oldEntry == nil {
- if err := f.Store.InsertEntry(ctx, entry); err != nil {
- glog.Errorf("insert entry %s: %v", entry.FullPath, err)
- return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
- }
- } else {
- if o_excl {
- glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
- return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
- }
- if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
- glog.Errorf("update entry %s: %v", entry.FullPath, err)
- return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
- }
- }
-
- f.maybeAddBucket(entry)
- f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
-
- f.deleteChunksIfNotNew(oldEntry, entry)
-
- glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
-
- return nil
-}
-
-func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
- if oldEntry != nil {
- if oldEntry.IsDirectory() && !entry.IsDirectory() {
- glog.Errorf("existing %s is a directory", entry.FullPath)
- return fmt.Errorf("existing %s is a directory", entry.FullPath)
- }
- if !oldEntry.IsDirectory() && entry.IsDirectory() {
- glog.Errorf("existing %s is a file", entry.FullPath)
- return fmt.Errorf("existing %s is a file", entry.FullPath)
- }
- }
- return f.Store.UpdateEntry(ctx, entry)
-}
-
-func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
-
- now := time.Now()
-
- if string(p) == "/" {
- return &Entry{
- FullPath: p,
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | 0755,
- Uid: OS_UID,
- Gid: OS_GID,
- },
- }, nil
- }
- entry, err = f.Store.FindEntry(ctx, p)
- if entry != nil && entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
- return nil, filer_pb.ErrNotFound
- }
- }
- return
-
-}
-
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) {
- if strings.HasSuffix(string(p), "/") && len(p) > 1 {
- p = p[0 : len(p)-1]
- }
-
- var makeupEntries []*Entry
- entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
- for expiredCount > 0 && err == nil {
- makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
- if err == nil {
- entries = append(entries, makeupEntries...)
- }
- }
-
- return entries, err
-}
-
-func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) {
- listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix)
- if listErr != nil {
- return listedEntries, expiredCount, "", listErr
- }
- for _, entry := range listedEntries {
- lastFileName = entry.Name()
- if entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
- expiredCount++
- continue
- }
- }
- entries = append(entries, entry)
- }
- return
-}
-
-func (f *Filer) Shutdown() {
- f.LocalMetaLogBuffer.Shutdown()
- f.Store.Shutdown()
-}
diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go
deleted file mode 100644
index 6b7c2c31a..000000000
--- a/weed/filer2/filer_buckets.go
+++ /dev/null
@@ -1,121 +0,0 @@
-package filer2
-
-import (
- "context"
- "math"
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type BucketName string
-type BucketOption struct {
- Name BucketName
- Replication string
- fsync bool
-}
-type FilerBuckets struct {
- dirBucketsPath string
- buckets map[BucketName]*BucketOption
- sync.RWMutex
-}
-
-func (f *Filer) LoadBuckets() {
-
- f.buckets = &FilerBuckets{
- buckets: make(map[BucketName]*BucketOption),
- }
-
- limit := math.MaxInt32
-
- entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "")
-
- if err != nil {
- glog.V(1).Infof("no buckets found: %v", err)
- return
- }
-
- shouldFsyncMap := make(map[string]bool)
- for _, bucket := range f.FsyncBuckets {
- shouldFsyncMap[bucket] = true
- }
-
- glog.V(1).Infof("buckets found: %d", len(entries))
-
- f.buckets.Lock()
- for _, entry := range entries {
- _, shouldFsnyc := shouldFsyncMap[entry.Name()]
- f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
- Name: BucketName(entry.Name()),
- Replication: entry.Replication,
- fsync: shouldFsnyc,
- }
- }
- f.buckets.Unlock()
-
-}
-
-func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
-
- f.buckets.RLock()
- defer f.buckets.RUnlock()
-
- option, found := f.buckets.buckets[BucketName(buketName)]
-
- if !found {
- return "", false
- }
- return option.Replication, option.fsync
-
-}
-
-func (f *Filer) isBucket(entry *Entry) bool {
- if !entry.IsDirectory() {
- return false
- }
- parent, dirName := entry.FullPath.DirAndName()
- if parent != f.DirBucketsPath {
- return false
- }
-
- f.buckets.RLock()
- defer f.buckets.RUnlock()
-
- _, found := f.buckets.buckets[BucketName(dirName)]
-
- return found
-
-}
-
-func (f *Filer) maybeAddBucket(entry *Entry) {
- if !entry.IsDirectory() {
- return
- }
- parent, dirName := entry.FullPath.DirAndName()
- if parent != f.DirBucketsPath {
- return
- }
- f.addBucket(dirName, &BucketOption{
- Name: BucketName(dirName),
- Replication: entry.Replication,
- })
-}
-
-func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {
-
- f.buckets.Lock()
- defer f.buckets.Unlock()
-
- f.buckets.buckets[BucketName(buketName)] = bucketOption
-
-}
-
-func (f *Filer) deleteBucket(buketName string) {
-
- f.buckets.Lock()
- defer f.buckets.Unlock()
-
- delete(f.buckets.buckets, BucketName(buketName))
-
-}
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
deleted file mode 100644
index 4d2a1ef43..000000000
--- a/weed/filer2/filer_delete_entry.go
+++ /dev/null
@@ -1,129 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
- if p == "/" {
- return nil
- }
-
- entry, findErr := f.FindEntry(ctx, p)
- if findErr != nil {
- return findErr
- }
-
- isCollection := f.isBucket(entry)
-
- var chunks []*filer_pb.FileChunk
- chunks = append(chunks, entry.Chunks...)
- if entry.IsDirectory() {
- // delete the folder children, not including the folder itself
- var dirChunks []*filer_pb.FileChunk
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster)
- if err != nil {
- glog.V(0).Infof("delete directory %s: %v", p, err)
- return fmt.Errorf("delete directory %s: %v", p, err)
- }
- chunks = append(chunks, dirChunks...)
- }
-
- // delete the file or folder
- err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures)
- if err != nil {
- return fmt.Errorf("delete file %s: %v", p, err)
- }
-
- if shouldDeleteChunks && !isCollection {
- go f.DeleteChunks(chunks)
- }
- if isCollection {
- collectionName := entry.Name()
- f.doDeleteCollection(collectionName)
- f.deleteBucket(collectionName)
- }
-
- return nil
-}
-
-func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) {
-
- lastFileName := ""
- includeLastFile := false
- for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "")
- if err != nil {
- glog.Errorf("list folder %s: %v", entry.FullPath, err)
- return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
- }
- if lastFileName == "" && !isRecursive && len(entries) > 0 {
- // only for first iteration in the loop
- glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
- }
-
- for _, sub := range entries {
- lastFileName = sub.Name()
- var dirChunks []*filer_pb.FileChunk
- if sub.IsDirectory() {
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
- chunks = append(chunks, dirChunks...)
- } else {
- f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
- chunks = append(chunks, sub.Chunks...)
- }
- if err != nil && !ignoreRecursiveError {
- return nil, err
- }
- }
-
- if len(entries) < PaginationSize {
- break
- }
- }
-
- glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
-
- if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
- return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
-
- f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil)
-
- return chunks, nil
-}
-
-func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
-
- glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
-
- if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
- return fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
- if !entry.IsDirectory() {
- f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
- }
-
- return nil
-}
-
-func (f *Filer) doDeleteCollection(collectionName string) (err error) {
-
- return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
- Name: collectionName,
- })
- if err != nil {
- glog.Infof("delete collection %s: %v", collectionName, err)
- }
- return err
- })
-
-}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
deleted file mode 100644
index dbee4a61d..000000000
--- a/weed/filer2/filer_deletion.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package filer2
-
-import (
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error) {
- return func(vids []string) (map[string]operation.LookupResult, error) {
- m := make(map[string]operation.LookupResult)
- for _, vid := range vids {
- locs, _ := masterClient.GetVidLocations(vid)
- var locations []operation.Location
- for _, loc := range locs {
- locations = append(locations, operation.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
- })
- }
- m[vid] = operation.LookupResult{
- VolumeId: vid,
- Locations: locations,
- }
- }
- return m, nil
- }
-}
-
-func (f *Filer) loopProcessingDeletion() {
-
- lookupFunc := LookupByMasterClientFn(f.MasterClient)
-
- DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
-
- var deletionCount int
- for {
- deletionCount = 0
- f.fileIdDeletionQueue.Consume(func(fileIds []string) {
- for len(fileIds) > 0 {
- var toDeleteFileIds []string
- if len(fileIds) > DeletionBatchSize {
- toDeleteFileIds = fileIds[:DeletionBatchSize]
- fileIds = fileIds[DeletionBatchSize:]
- } else {
- toDeleteFileIds = fileIds
- fileIds = fileIds[:0]
- }
- deletionCount = len(toDeleteFileIds)
- _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
- if err != nil {
- if !strings.Contains(err.Error(), "already deleted") {
- glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
- }
- } else {
- glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
- }
- }
- })
-
- if deletionCount == 0 {
- time.Sleep(1123 * time.Millisecond)
- }
- }
-}
-
-func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
- for _, chunk := range chunks {
- if !chunk.IsChunkManifest {
- f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
- continue
- }
- dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
- if manifestResolveErr != nil {
- glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
- }
- for _, dChunk := range dataChunks {
- f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
- }
- f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
- }
-}
-
-func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
-
- if oldEntry == nil {
- return
- }
- if newEntry == nil {
- f.DeleteChunks(oldEntry.Chunks)
- }
-
- var toDelete []*filer_pb.FileChunk
- newChunkIds := make(map[string]bool)
- for _, newChunk := range newEntry.Chunks {
- newChunkIds[newChunk.GetFileIdString()] = true
- }
-
- for _, oldChunk := range oldEntry.Chunks {
- if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
- toDelete = append(toDelete, oldChunk)
- }
- }
- f.DeleteChunks(toDelete)
-}
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
deleted file mode 100644
index a95fcf0b4..000000000
--- a/weed/filer2/filer_notify.go
+++ /dev/null
@@ -1,169 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "io"
- "strings"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/notification"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) {
- var fullpath string
- if oldEntry != nil {
- fullpath = string(oldEntry.FullPath)
- } else if newEntry != nil {
- fullpath = string(newEntry.FullPath)
- } else {
- return
- }
-
- // println("fullpath:", fullpath)
-
- if strings.HasPrefix(fullpath, SystemLogDir) {
- return
- }
-
- newParentPath := ""
- if newEntry != nil {
- newParentPath, _ = newEntry.FullPath.DirAndName()
- }
- eventNotification := &filer_pb.EventNotification{
- OldEntry: oldEntry.ToProtoEntry(),
- NewEntry: newEntry.ToProtoEntry(),
- DeleteChunks: deleteChunks,
- NewParentPath: newParentPath,
- IsFromOtherCluster: isFromOtherCluster,
- Signatures: append(signatures, f.Signature),
- }
-
- if notification.Queue != nil {
- glog.V(3).Infof("notifying entry update %v", fullpath)
- notification.Queue.SendMessage(fullpath, eventNotification)
- }
-
- f.logMetaEvent(ctx, fullpath, eventNotification)
-
-}
-
-func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotification *filer_pb.EventNotification) {
-
- dir, _ := util.FullPath(fullpath).DirAndName()
-
- event := &filer_pb.SubscribeMetadataResponse{
- Directory: dir,
- EventNotification: eventNotification,
- TsNs: time.Now().UnixNano(),
- }
- data, err := proto.Marshal(event)
- if err != nil {
- glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
- return
- }
-
- f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
-
-}
-
-func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
-
- startTime, stopTime = startTime.UTC(), stopTime.UTC()
-
- targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- // startTime.Second(), startTime.Nanosecond(),
- )
-
- for {
- if err := f.appendToFile(targetFile, buf); err != nil {
- glog.V(1).Infof("log write failed %s: %v", targetFile, err)
- time.Sleep(737 * time.Millisecond)
- } else {
- break
- }
- }
-}
-
-func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
-
- startTime = startTime.UTC()
- startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
-
- sizeBuf := make([]byte, 4)
- startTsNs := startTime.UnixNano()
-
- dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "")
- if listDayErr != nil {
- return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
- }
- for _, dayEntry := range dayEntries {
- // println("checking day", dayEntry.FullPath)
- hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "")
- if listHourMinuteErr != nil {
- return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
- }
- for _, hourMinuteEntry := range hourMinuteEntries {
- // println("checking hh-mm", hourMinuteEntry.FullPath)
- if dayEntry.Name() == startDate {
- if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
- continue
- }
- }
- // println("processing", hourMinuteEntry.FullPath)
- chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks)
- if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
- chunkedFileReader.Close()
- if err == io.EOF {
- continue
- }
- return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
- }
- chunkedFileReader.Close()
- }
- }
-
- return lastTsNs, nil
-}
-
-func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
- for {
- n, err := r.Read(sizeBuf)
- if err != nil {
- return lastTsNs, err
- }
- if n != 4 {
- return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n)
- }
- size := util.BytesToUint32(sizeBuf)
- // println("entry size", size)
- entryData := make([]byte, size)
- n, err = r.Read(entryData)
- if err != nil {
- return lastTsNs, err
- }
- if n != int(size) {
- return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
- }
- logEntry := &filer_pb.LogEntry{}
- if err = proto.Unmarshal(entryData, logEntry); err != nil {
- return lastTsNs, err
- }
- if logEntry.TsNs <= ns {
- return lastTsNs, nil
- }
- // println("each log: ", logEntry.TsNs)
- if err := eachLogEntryFn(logEntry); err != nil {
- return lastTsNs, err
- } else {
- lastTsNs = logEntry.TsNs
- }
- }
-}
diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go
deleted file mode 100644
index 31cdb3c1c..000000000
--- a/weed/filer2/filer_notify_append.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "os"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (f *Filer) appendToFile(targetFile string, data []byte) error {
-
- assignResult, uploadResult, err2 := f.assignAndUpload(data)
- if err2 != nil {
- return err2
- }
-
- // find out existing entry
- fullpath := util.FullPath(targetFile)
- entry, err := f.FindEntry(context.Background(), fullpath)
- var offset int64 = 0
- if err == filer_pb.ErrNotFound {
- entry = &Entry{
- FullPath: fullpath,
- Attr: Attr{
- Crtime: time.Now(),
- Mtime: time.Now(),
- Mode: os.FileMode(0644),
- Uid: OS_UID,
- Gid: OS_GID,
- },
- }
- } else {
- offset = int64(TotalSize(entry.Chunks))
- }
-
- // append to existing chunks
- entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
-
- // update the entry
- err = f.CreateEntry(context.Background(), entry, false, false, nil)
-
- return err
-}
-
-func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
- // assign a volume location
- assignRequest := &operation.VolumeAssignRequest{
- Count: 1,
- Collection: f.metaLogCollection,
- Replication: f.metaLogReplication,
- WritableVolumeCount: 1,
- }
- assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
- if err != nil {
- return nil, nil, fmt.Errorf("AssignVolume: %v", err)
- }
- if assignResult.Error != "" {
- return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
- }
-
- // upload data
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
- if err != nil {
- return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
- }
- // println("uploaded to", targetUrl)
- return assignResult, uploadResult, nil
-}
diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go
deleted file mode 100644
index 29170bfdf..000000000
--- a/weed/filer2/filer_notify_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package filer2
-
-import (
- "testing"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-
- "github.com/golang/protobuf/proto"
-)
-
-func TestProtoMarshalText(t *testing.T) {
-
- oldEntry := &Entry{
- FullPath: util.FullPath("/this/path/to"),
- Attr: Attr{
- Mtime: time.Now(),
- Mode: 0644,
- Uid: 1,
- Mime: "text/json",
- TtlSec: 25,
- },
- Chunks: []*filer_pb.FileChunk{
- &filer_pb.FileChunk{
- FileId: "234,2423423422",
- Offset: 234234,
- Size: 234,
- Mtime: 12312423,
- ETag: "2342342354",
- SourceFileId: "23234,2342342342",
- },
- },
- }
-
- notification := &filer_pb.EventNotification{
- OldEntry: oldEntry.ToProtoEntry(),
- NewEntry: nil,
- DeleteChunks: true,
- }
-
- text := proto.MarshalTextString(notification)
-
- notification2 := &filer_pb.EventNotification{}
- proto.UnmarshalText(text, notification2)
-
- if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId {
- t.Fatalf("marshal/unmarshal error: %s", text)
- }
-
- println(text)
-
-}
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
deleted file mode 100644
index dd5abe421..000000000
--- a/weed/filer2/filerstore.go
+++ /dev/null
@@ -1,208 +0,0 @@
-package filer2
-
-import (
- "context"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type FilerStore interface {
- // GetName gets the name to locate the configuration in filer.toml file
- GetName() string
- // Initialize initializes the file store
- Initialize(configuration util.Configuration, prefix string) error
- InsertEntry(context.Context, *Entry) error
- UpdateEntry(context.Context, *Entry) (err error)
- // err == filer2.ErrNotFound if not found
- FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
- DeleteEntry(context.Context, util.FullPath) (err error)
- DeleteFolderChildren(context.Context, util.FullPath) (err error)
- ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
- ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
-
- BeginTransaction(ctx context.Context) (context.Context, error)
- CommitTransaction(ctx context.Context) error
- RollbackTransaction(ctx context.Context) error
-
- Shutdown()
-}
-
-type FilerLocalStore interface {
- UpdateOffset(filer string, lastTsNs int64) error
- ReadOffset(filer string) (lastTsNs int64, err error)
-}
-
-type FilerStoreWrapper struct {
- ActualStore FilerStore
-}
-
-func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
- if innerStore, ok := store.(*FilerStoreWrapper); ok {
- return innerStore
- }
- return &FilerStoreWrapper{
- ActualStore: store,
- }
-}
-
-func (fsw *FilerStoreWrapper) GetName() string {
- return fsw.ActualStore.GetName()
-}
-
-func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
- return fsw.ActualStore.Initialize(configuration, prefix)
-}
-
-func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- if entry.Mime == "application/octet-stream" {
- entry.Mime = ""
- }
- return fsw.ActualStore.InsertEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- if entry.Mime == "application/octet-stream" {
- entry.Mime = ""
- }
- return fsw.ActualStore.UpdateEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
- }()
-
- entry, err = fsw.ActualStore.FindEntry(ctx, fp)
- if err != nil {
- return nil, err
- }
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- return
-}
-
-func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
- }()
-
- return fsw.ActualStore.DeleteEntry(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
- }()
-
- return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
- }()
-
- entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, err
-}
-
-func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
- }()
- entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
- if err == ErrUnsupportedListDirectoryPrefixed {
- entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
- }
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, nil
-}
-
-func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
- entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
-
- if prefix == "" {
- return
- }
-
- count := 0
- var lastFileName string
- notPrefixed := entries
- entries = nil
- for count < limit && len(notPrefixed) > 0 {
- for _, entry := range notPrefixed {
- lastFileName = entry.Name()
- if strings.HasPrefix(entry.Name(), prefix) {
- count++
- entries = append(entries, entry)
- if count >= limit {
- break
- }
- }
- }
- if count < limit {
- notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
- if err != nil {
- return
- }
- }
- }
- return
-}
-
-func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
- return fsw.ActualStore.BeginTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
- return fsw.ActualStore.CommitTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
- return fsw.ActualStore.RollbackTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) Shutdown() {
- fsw.ActualStore.Shutdown()
-}
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
deleted file mode 100644
index 1c08d2831..000000000
--- a/weed/filer2/leveldb/leveldb_store.go
+++ /dev/null
@@ -1,231 +0,0 @@
-package leveldb
-
-import (
- "bytes"
- "context"
- "fmt"
- "github.com/syndtr/goleveldb/leveldb"
- leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/opt"
- leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-const (
- DIR_FILE_SEPARATOR = byte(0x00)
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &LevelDBStore{})
-}
-
-type LevelDBStore struct {
- db *leveldb.DB
-}
-
-func (store *LevelDBStore) GetName() string {
- return "leveldb"
-}
-
-func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
- dir := configuration.GetString(prefix + "dir")
- return store.initialize(dir)
-}
-
-func (store *LevelDBStore) initialize(dir string) (err error) {
- glog.Infof("filer store dir: %s", dir)
- if err := weed_util.TestFolderWritable(dir); err != nil {
- return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
- }
-
- opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10,
- }
-
- if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
- if leveldb_errors.IsCorrupted(err) {
- store.db, err = leveldb.RecoverFile(dir, opts)
- }
- if err != nil {
- glog.Infof("filer store open dir %s: %v", dir, err)
- return
- }
- }
- return
-}
-
-func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *LevelDBStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- key := genKey(entry.DirAndName())
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- err = store.db.Put(key, value, nil)
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
-
- return nil
-}
-
-func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
- key := genKey(fullpath.DirAndName())
-
- data, err := store.db.Get(key, nil)
-
- if err == leveldb.ErrNotFound {
- return nil, filer_pb.ErrNotFound
- }
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
-
- return entry, nil
-}
-
-func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- key := genKey(fullpath.DirAndName())
-
- err = store.db.Delete(key, nil)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
-
- batch := new(leveldb.Batch)
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
- iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- batch.Delete([]byte(genKey(string(fullpath), fileName)))
- }
- iter.Release()
-
- err = store.db.Write(batch, nil)
-
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- return nil, filer2.ErrUnsupportedListDirectoryPrefixed
-}
-
-func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
-
- iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- if fileName == startFileName && !inclusive {
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
- }
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
- iter.Release()
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string) (key []byte) {
- key = []byte(dirPath)
- key = append(key, DIR_FILE_SEPARATOR)
- key = append(key, []byte(fileName)...)
- return key
-}
-
-func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
- keyPrefix = []byte(string(fullpath))
- keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix
-}
-
-func getNameFromKey(key []byte) string {
-
- sepIndex := len(key) - 1
- for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
- sepIndex--
- }
-
- return string(key[sepIndex+1:])
-
-}
-
-func (store *LevelDBStore) Shutdown() {
- store.db.Close()
-}
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
deleted file mode 100644
index 86e8aa952..000000000
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package leveldb
-
-import (
- "context"
- "io/ioutil"
- "os"
- "testing"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
- store := &LevelDBStore{}
- store.initialize(dir)
- filer.SetStore(store)
-
- fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
-
- ctx := context.Background()
-
- entry1 := &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil {
- t.Errorf("create entry %v: %v", entry1.FullPath, err)
- return
- }
-
- entry, err := filer.FindEntry(ctx, fullpath)
-
- if err != nil {
- t.Errorf("find entry: %v", err)
- return
- }
-
- if entry.FullPath != entry1.FullPath {
- t.Errorf("find wrong entry: %v", entry.FullPath)
- return
- }
-
- // checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
-
-func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
- store := &LevelDBStore{}
- store.initialize(dir)
- filer.SetStore(store)
-
- ctx := context.Background()
-
- // checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
- if err != nil {
- t.Errorf("list entries: %v", err)
- return
- }
- if len(entries) != 0 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
diff --git a/weed/filer2/leveldb2/leveldb2_local_store.go b/weed/filer2/leveldb2/leveldb2_local_store.go
deleted file mode 100644
index 3625abf9e..000000000
--- a/weed/filer2/leveldb2/leveldb2_local_store.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package leveldb
-
-import (
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- _ = filer2.FilerLocalStore(&LevelDB2Store{})
-)
-
-func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error {
-
- value := make([]byte, 8)
- util.Uint64toBytes(value, uint64(lastTsNs))
-
- err := store.dbs[0].Put([]byte("meta"+filer), value, nil)
-
- if err != nil {
- return fmt.Errorf("UpdateOffset %s : %v", filer, err)
- }
-
- println("UpdateOffset", filer, "lastTsNs", lastTsNs)
-
- return nil
-}
-
-func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) {
-
- value, err := store.dbs[0].Get([]byte("meta"+filer), nil)
-
- if err != nil {
- return 0, fmt.Errorf("ReadOffset %s : %v", filer, err)
- }
-
- lastTsNs = int64(util.BytesToUint64(value))
-
- println("ReadOffset", filer, "lastTsNs", lastTsNs)
-
- return
-}
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
deleted file mode 100644
index ca9d6f04d..000000000
--- a/weed/filer2/leveldb2/leveldb2_store.go
+++ /dev/null
@@ -1,251 +0,0 @@
-package leveldb
-
-import (
- "bytes"
- "context"
- "crypto/md5"
- "fmt"
- "github.com/syndtr/goleveldb/leveldb"
- leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/opt"
- leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
- "io"
- "os"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &LevelDB2Store{})
-}
-
-type LevelDB2Store struct {
- dbs []*leveldb.DB
- dbCount int
-}
-
-func (store *LevelDB2Store) GetName() string {
- return "leveldb2"
-}
-
-func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
- dir := configuration.GetString(prefix + "dir")
- return store.initialize(dir, 8)
-}
-
-func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
- glog.Infof("filer store leveldb2 dir: %s", dir)
- if err := weed_util.TestFolderWritable(dir); err != nil {
- return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
- }
-
- opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- }
-
- for d := 0; d < dbCount; d++ {
- dbFolder := fmt.Sprintf("%s/%02d", dir, d)
- os.MkdirAll(dbFolder, 0755)
- db, dbErr := leveldb.OpenFile(dbFolder, opts)
- if leveldb_errors.IsCorrupted(dbErr) {
- db, dbErr = leveldb.RecoverFile(dbFolder, opts)
- }
- if dbErr != nil {
- glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
- return dbErr
- }
- store.dbs = append(store.dbs, db)
- }
- store.dbCount = dbCount
-
- return
-}
-
-func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- dir, name := entry.DirAndName()
- key, partitionId := genKey(dir, name, store.dbCount)
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- err = store.dbs[partitionId].Put(key, value, nil)
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
-
- return nil
-}
-
-func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
- dir, name := fullpath.DirAndName()
- key, partitionId := genKey(dir, name, store.dbCount)
-
- data, err := store.dbs[partitionId].Get(key, nil)
-
- if err == leveldb.ErrNotFound {
- return nil, filer_pb.ErrNotFound
- }
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks(data)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
-
- return entry, nil
-}
-
-func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- dir, name := fullpath.DirAndName()
- key, partitionId := genKey(dir, name, store.dbCount)
-
- err = store.dbs[partitionId].Delete(key, nil)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
-
- batch := new(leveldb.Batch)
-
- iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- batch.Delete(append(directoryPrefix, []byte(fileName)...))
- }
- iter.Release()
-
- err = store.dbs[partitionId].Write(batch, nil)
-
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- return nil, filer2.ErrUnsupportedListDirectoryPrefixed
-}
-
-func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
- lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
-
- iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
- for iter.Next() {
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- fileName := getNameFromKey(key)
- if fileName == "" {
- continue
- }
- if fileName == startFileName && !inclusive {
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entry := &filer2.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
- }
-
- // println("list", entry.FullPath, "chunks", len(entry.Chunks))
-
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
- entries = append(entries, entry)
- }
- iter.Release()
-
- return entries, err
-}
-
-func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) {
- key, partitionId = hashToBytes(dirPath, dbCount)
- key = append(key, []byte(fileName)...)
- return key, partitionId
-}
-
-func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
- keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount)
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix, partitionId
-}
-
-func getNameFromKey(key []byte) string {
-
- return string(key[md5.Size:])
-
-}
-
-// hash directory, and use last byte for partitioning
-func hashToBytes(dir string, dbCount int) ([]byte, int) {
- h := md5.New()
- io.WriteString(h, dir)
-
- b := h.Sum(nil)
-
- x := b[len(b)-1]
-
- return b, int(x) % dbCount
-}
-
-func (store *LevelDB2Store) Shutdown() {
- for d := 0; d < store.dbCount; d++ {
- store.dbs[d].Close()
- }
-}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
deleted file mode 100644
index 62d2475fe..000000000
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package leveldb
-
-import (
- "context"
- "io/ioutil"
- "os"
- "testing"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
- store := &LevelDB2Store{}
- store.initialize(dir, 2)
- filer.SetStore(store)
-
- fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
-
- ctx := context.Background()
-
- entry1 := &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil {
- t.Errorf("create entry %v: %v", entry1.FullPath, err)
- return
- }
-
- entry, err := filer.FindEntry(ctx, fullpath)
-
- if err != nil {
- t.Errorf("find entry: %v", err)
- return
- }
-
- if entry.FullPath != entry1.FullPath {
- t.Errorf("find wrong entry: %v", entry.FullPath)
- return
- }
-
- // checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
-
-func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil, nil, "", 0, "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
- store := &LevelDB2Store{}
- store.initialize(dir, 2)
- filer.SetStore(store)
-
- ctx := context.Background()
-
- // checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
- if err != nil {
- t.Errorf("list entries: %v", err)
- return
- }
- if len(entries) != 0 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go
deleted file mode 100644
index f2792bd26..000000000
--- a/weed/filer2/meta_aggregator.go
+++ /dev/null
@@ -1,131 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "io"
- "sync"
- "time"
-
- "github.com/golang/protobuf/proto"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
-)
-
-type MetaAggregator struct {
- filers []string
- grpcDialOption grpc.DialOption
- MetaLogBuffer *log_buffer.LogBuffer
- // notifying clients
- ListenersLock sync.Mutex
- ListenersCond *sync.Cond
-}
-
-// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
-// The old data comes from what each LocalMetadata persisted on disk.
-func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
- t := &MetaAggregator{
- filers: filers,
- grpcDialOption: grpcDialOption,
- }
- t.ListenersCond = sync.NewCond(&t.ListenersLock)
- t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
- t.ListenersCond.Broadcast()
- })
- return t
-}
-
-func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
- for _, filer := range ma.filers {
- go ma.subscribeToOneFiler(f, self, filer)
- }
-}
-
-func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
-
- var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
- lastPersistTime := time.Now()
- changesSinceLastPersist := 0
- lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
-
- MaxChangeLimit := 100
-
- if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
- if self != filer {
-
- if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
- lastTsNs = prevTsNs
- }
-
- glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
- maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
- if err := Replay(f.Store.ActualStore, event); err != nil {
- glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
- return
- }
- changesSinceLastPersist++
- if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
- if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
- lastPersistTime = time.Now()
- changesSinceLastPersist = 0
- } else {
- glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
- }
- }
- }
- } else {
- glog.V(0).Infof("skipping following self: %v", self)
- }
- }
-
- processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
- data, err := proto.Marshal(event)
- if err != nil {
- glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
- return err
- }
- dir := event.Directory
- // println("received meta change", dir, "size", len(data))
- ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
- if maybeReplicateMetadataChange != nil {
- maybeReplicateMetadataChange(event)
- }
- return nil
- }
-
- for {
- err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
- ClientName: "filer:" + self,
- PathPrefix: "/",
- SinceNs: lastTsNs,
- })
- if err != nil {
- return fmt.Errorf("subscribe: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("process %v: %v", resp, err)
- }
- lastTsNs = resp.TsNs
- }
- })
- if err != nil {
- glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
- time.Sleep(1733 * time.Millisecond)
- }
- }
-}
diff --git a/weed/filer2/meta_replay.go b/weed/filer2/meta_replay.go
deleted file mode 100644
index d9cdaa76a..000000000
--- a/weed/filer2/meta_replay.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package filer2
-
-import (
- "context"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
- var oldPath util.FullPath
- var newEntry *Entry
- if message.OldEntry != nil {
- oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
- glog.V(4).Infof("deleting %v", oldPath)
- if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil {
- return err
- }
- }
-
- if message.NewEntry != nil {
- dir := resp.Directory
- if message.NewParentPath != "" {
- dir = message.NewParentPath
- }
- key := util.NewFullPath(dir, message.NewEntry.Name)
- glog.V(4).Infof("creating %v", key)
- newEntry = FromPbEntry(dir, message.NewEntry)
- if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil {
- return err
- }
- }
-
- return nil
-}
diff --git a/weed/filer2/mongodb/mongodb_store.go b/weed/filer2/mongodb/mongodb_store.go
deleted file mode 100644
index 661aa4ea0..000000000
--- a/weed/filer2/mongodb/mongodb_store.go
+++ /dev/null
@@ -1,214 +0,0 @@
-package mongodb
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- "go.mongodb.org/mongo-driver/x/bsonx"
- "time"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &MongodbStore{})
-}
-
-type MongodbStore struct {
- connect *mongo.Client
- database string
- collectionName string
-}
-
-type Model struct {
- Directory string `bson:"directory"`
- Name string `bson:"name"`
- Meta []byte `bson:"meta"`
-}
-
-func (store *MongodbStore) GetName() string {
- return "mongodb"
-}
-
-func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- store.database = configuration.GetString(prefix + "database")
- store.collectionName = "filemeta"
- poolSize := configuration.GetInt(prefix + "option_pool_size")
- return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize))
-}
-
-func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) {
- ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
- opts := options.Client().ApplyURI(uri)
-
- if poolSize > 0 {
- opts.SetMaxPoolSize(poolSize)
- }
-
- client, err := mongo.Connect(ctx, opts)
- if err != nil {
- return err
- }
-
- c := client.Database(store.database).Collection(store.collectionName)
- err = store.indexUnique(c)
- store.connect = client
- return err
-}
-
-func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error {
- _, err := c.Indexes().CreateOne(context.Background(), index, opts)
- return err
-}
-
-func (store *MongodbStore) indexUnique(c *mongo.Collection) error {
- opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
-
- unique := new(bool)
- *unique = true
-
- index := mongo.IndexModel{
- Keys: bsonx.Doc{{Key: "directory", Value: bsonx.Int32(1)}, {Key: "name", Value: bsonx.Int32(1)}},
- Options: &options.IndexOptions{
- Unique: unique,
- },
- }
-
- return store.createIndex(c, index, opts)
-}
-
-func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-
-func (store *MongodbStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *MongodbStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- dir, name := entry.FullPath.DirAndName()
- meta, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encode %s: %s", entry.FullPath, err)
- }
-
- c := store.connect.Database(store.database).Collection(store.collectionName)
-
- _, err = c.InsertOne(ctx, Model{
- Directory: dir,
- Name: name,
- Meta: meta,
- })
-
- return nil
-}
-
-func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
-
- dir, name := fullpath.DirAndName()
- var data Model
-
- var where = bson.M{"directory": dir, "name": name}
- err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
- if err != mongo.ErrNoDocuments && err != nil {
- return nil, filer_pb.ErrNotFound
- }
-
- if len(data.Meta) == 0 {
- return nil, filer_pb.ErrNotFound
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
-
- err = entry.DecodeAttributesAndChunks(data.Meta)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
-
- dir, name := fullpath.DirAndName()
-
- where := bson.M{"directory": dir, "name": name}
- _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
-
- where := bson.M{"directory": fullpath}
- _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- return nil
-}
-
-func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- return nil, filer2.ErrUnsupportedListDirectoryPrefixed
-}
-
-func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
-
- var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}}
- if inclusive {
- where["name"] = bson.M{
- "$gte": startFileName,
- }
- }
- optLimit := int64(limit)
- opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}}
- cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts)
- for cur.Next(ctx) {
- var data Model
- err := cur.Decode(&data)
- if err != nil && err != mongo.ErrNoDocuments {
- return nil, err
- }
-
- entry := &filer2.Entry{
- FullPath: util.NewFullPath(string(fullpath), data.Name),
- }
- if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil {
- err = decodeErr
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- break
- }
-
- entries = append(entries, entry)
- }
-
- if err := cur.Close(ctx); err != nil {
- glog.V(0).Infof("list iterator close: %v", err)
- }
-
- return entries, err
-}
-
-func (store *MongodbStore) Shutdown() {
- ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
- store.connect.Disconnect(ctx)
-}
diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go
deleted file mode 100644
index 9f748445e..000000000
--- a/weed/filer2/mysql/mysql_store.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package mysql
-
-import (
- "database/sql"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
- "github.com/chrislusf/seaweedfs/weed/util"
- _ "github.com/go-sql-driver/mysql"
-)
-
-const (
- CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &MysqlStore{})
-}
-
-type MysqlStore struct {
- abstract_sql.AbstractSqlStore
-}
-
-func (store *MysqlStore) GetName() string {
- return "mysql"
-}
-
-func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- return store.initialize(
- configuration.GetString(prefix+"username"),
- configuration.GetString(prefix+"password"),
- configuration.GetString(prefix+"hostname"),
- configuration.GetInt(prefix+"port"),
- configuration.GetString(prefix+"database"),
- configuration.GetInt(prefix+"connection_max_idle"),
- configuration.GetInt(prefix+"connection_max_open"),
- configuration.GetBool(prefix+"interpolateParams"),
- )
-}
-
-func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
- interpolateParams bool) (err error) {
- //
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
- store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
-
- sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
- if interpolateParams {
- sqlUrl += "&interpolateParams=true"
- }
-
- var dbErr error
- store.DB, dbErr = sql.Open("mysql", sqlUrl)
- if dbErr != nil {
- store.DB.Close()
- store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
- }
-
- store.DB.SetMaxIdleConns(maxIdle)
- store.DB.SetMaxOpenConns(maxOpen)
-
- if err = store.DB.Ping(); err != nil {
- return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
- }
-
- return nil
-}
diff --git a/weed/filer2/permission.go b/weed/filer2/permission.go
deleted file mode 100644
index 8a9508fbc..000000000
--- a/weed/filer2/permission.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package filer2
-
-func hasWritePermission(dir *Entry, entry *Entry) bool {
-
- if dir == nil {
- return false
- }
-
- if dir.Uid == entry.Uid && dir.Mode&0200 > 0 {
- return true
- }
-
- if dir.Gid == entry.Gid && dir.Mode&0020 > 0 {
- return true
- }
-
- if dir.Mode&0002 > 0 {
- return true
- }
-
- return false
-}
diff --git a/weed/filer2/postgres/README.txt b/weed/filer2/postgres/README.txt
deleted file mode 100644
index cb0c99c63..000000000
--- a/weed/filer2/postgres/README.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-
-1. create "seaweedfs" database
-
-export PGHOME=/Library/PostgreSQL/10
-$PGHOME/bin/createdb --username=postgres --password seaweedfs
-
-2. create "filemeta" table
-$PGHOME/bin/psql --username=postgres --password seaweedfs
-
-CREATE TABLE IF NOT EXISTS filemeta (
- dirhash BIGINT,
- name VARCHAR(65535),
- directory VARCHAR(65535),
- meta bytea,
- PRIMARY KEY (dirhash, name)
-);
-
diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go
deleted file mode 100644
index 87eb6aca2..000000000
--- a/weed/filer2/postgres/postgres_store.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package postgres
-
-import (
- "database/sql"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/filer2/abstract_sql"
- "github.com/chrislusf/seaweedfs/weed/util"
- _ "github.com/lib/pq"
-)
-
-const (
- CONNECTION_URL_PATTERN = "host=%s port=%d user=%s sslmode=%s connect_timeout=30"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &PostgresStore{})
-}
-
-type PostgresStore struct {
- abstract_sql.AbstractSqlStore
-}
-
-func (store *PostgresStore) GetName() string {
- return "postgres"
-}
-
-func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- return store.initialize(
- configuration.GetString(prefix+"username"),
- configuration.GetString(prefix+"password"),
- configuration.GetString(prefix+"hostname"),
- configuration.GetInt(prefix+"port"),
- configuration.GetString(prefix+"database"),
- configuration.GetString(prefix+"sslmode"),
- configuration.GetInt(prefix+"connection_max_idle"),
- configuration.GetInt(prefix+"connection_max_open"),
- )
-}
-
-func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) {
-
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"
- store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like CONCAT($4,'%')ORDER BY NAME ASC LIMIT $5"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like CONCAT($4,'%') ORDER BY NAME ASC LIMIT $5"
-
- sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode)
- if password != "" {
- sqlUrl += " password=" + password
- }
- if database != "" {
- sqlUrl += " dbname=" + database
- }
- var dbErr error
- store.DB, dbErr = sql.Open("postgres", sqlUrl)
- if dbErr != nil {
- store.DB.Close()
- store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
- }
-
- store.DB.SetMaxIdleConns(maxIdle)
- store.DB.SetMaxOpenConns(maxOpen)
-
- if err = store.DB.Ping(); err != nil {
- return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
- }
-
- return nil
-}
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
deleted file mode 100644
index 0cea83ff9..000000000
--- a/weed/filer2/reader_at.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "io"
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-type ChunkReadAt struct {
- masterClient *wdclient.MasterClient
- chunkViews []*ChunkView
- lookupFileId func(fileId string) (targetUrl string, err error)
- readerLock sync.Mutex
- fileSize int64
-
- chunkCache chunk_cache.ChunkCache
-}
-
-// var _ = io.ReaderAt(&ChunkReadAt{})
-
-type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
-
-func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
- return func(fileId string) (targetUrl string, err error) {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- vid := VolumeId(fileId)
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
- VolumeIds: []string{vid},
- })
- if err != nil {
- return err
- }
-
- locations := resp.LocationsMap[vid]
- if locations == nil || len(locations.Locations) == 0 {
- glog.V(0).Infof("failed to locate %s", fileId)
- return fmt.Errorf("failed to locate %s", fileId)
- }
-
- volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
-
- targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
-
- return nil
- })
- return
- }
-}
-
-func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
-
- return &ChunkReadAt{
- chunkViews: chunkViews,
- lookupFileId: LookupFn(filerClient),
- chunkCache: chunkCache,
- fileSize: fileSize,
- }
-}
-
-func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
-
- c.readerLock.Lock()
- defer c.readerLock.Unlock()
-
- glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
- return c.doReadAt(p[n:], offset+int64(n))
-}
-
-func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
-
- var buffer []byte
- startOffset, remaining := offset, int64(len(p))
- for i, chunk := range c.chunkViews {
- if remaining <= 0 {
- break
- }
- if startOffset < chunk.LogicOffset {
- gap := int(chunk.LogicOffset - startOffset)
- glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
- n += int(min(int64(gap), remaining))
- startOffset, remaining = chunk.LogicOffset, remaining-int64(gap)
- if remaining <= 0 {
- break
- }
- }
- // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
- chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
- if chunkStart >= chunkStop {
- continue
- }
- glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
- buffer, err = c.readFromWholeChunkData(chunk)
- if err != nil {
- glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return
- }
- bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
- copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart])
- n += copied
- startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
- }
-
- glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
-
- if err == nil && remaining > 0 && c.fileSize > startOffset {
- delta := int(min(remaining, c.fileSize-startOffset))
- glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize)
- n += delta
- }
-
- if err == nil && offset+int64(len(p)) > c.fileSize {
- err = io.EOF
- }
- // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
-
- return
-
-}
-
-func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) {
-
- glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
-
- chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
- if chunkData != nil {
- glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData)))
- } else {
- glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId)
- chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
- if err != nil {
- return
- }
- c.chunkCache.SetChunk(chunkView.FileId, chunkData)
- }
-
- return
-}
-
-func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
-
- return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped)
-
-}
diff --git a/weed/filer2/reader_at_test.go b/weed/filer2/reader_at_test.go
deleted file mode 100644
index 7bfc9a972..000000000
--- a/weed/filer2/reader_at_test.go
+++ /dev/null
@@ -1,156 +0,0 @@
-package filer2
-
-import (
- "fmt"
- "io"
- "math"
- "strconv"
- "sync"
- "testing"
-)
-
-type mockChunkCache struct {
-}
-
-func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
- x, _ := strconv.Atoi(fileId)
- data = make([]byte, minSize)
- for i := 0; i < int(minSize); i++ {
- data[i] = byte(x)
- }
- return data
-}
-func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
-}
-
-func TestReaderAt(t *testing.T) {
-
- visibles := []VisibleInterval{
- {
- start: 1,
- stop: 2,
- fileId: "1",
- chunkSize: 9,
- },
- {
- start: 3,
- stop: 4,
- fileId: "3",
- chunkSize: 1,
- },
- {
- start: 5,
- stop: 6,
- fileId: "5",
- chunkSize: 2,
- },
- {
- start: 7,
- stop: 9,
- fileId: "7",
- chunkSize: 2,
- },
- {
- start: 9,
- stop: 10,
- fileId: "9",
- chunkSize: 2,
- },
- }
-
- readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 10,
- chunkCache: &mockChunkCache{},
- }
-
- testReadAt(t, readerAt, 0, 10, 10, nil)
- testReadAt(t, readerAt, 0, 12, 10, io.EOF)
- testReadAt(t, readerAt, 2, 8, 8, nil)
- testReadAt(t, readerAt, 3, 6, 6, nil)
-
-}
-
-func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) {
- data := make([]byte, size)
- n, err := readerAt.ReadAt(data, offset)
-
- for _, d := range data {
- fmt.Printf("%x", d)
- }
- fmt.Println()
-
- if expected != n {
- t.Errorf("unexpected read size: %d, expect: %d", n, expected)
- }
- if err != expectedErr {
- t.Errorf("unexpected read error: %v, expect: %v", err, expectedErr)
- }
-
-}
-
-func TestReaderAt0(t *testing.T) {
-
- visibles := []VisibleInterval{
- {
- start: 2,
- stop: 5,
- fileId: "1",
- chunkSize: 9,
- },
- {
- start: 7,
- stop: 9,
- fileId: "2",
- chunkSize: 9,
- },
- }
-
- readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 10,
- chunkCache: &mockChunkCache{},
- }
-
- testReadAt(t, readerAt, 0, 10, 10, nil)
- testReadAt(t, readerAt, 3, 16, 7, io.EOF)
- testReadAt(t, readerAt, 3, 5, 5, nil)
-
- testReadAt(t, readerAt, 11, 5, 0, io.EOF)
- testReadAt(t, readerAt, 10, 5, 0, io.EOF)
-
-}
-
-func TestReaderAt1(t *testing.T) {
-
- visibles := []VisibleInterval{
- {
- start: 2,
- stop: 5,
- fileId: "1",
- chunkSize: 9,
- },
- }
-
- readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 20,
- chunkCache: &mockChunkCache{},
- }
-
- testReadAt(t, readerAt, 0, 20, 20, nil)
- testReadAt(t, readerAt, 1, 7, 7, nil)
- testReadAt(t, readerAt, 0, 1, 1, nil)
- testReadAt(t, readerAt, 18, 4, 2, io.EOF)
- testReadAt(t, readerAt, 12, 4, 4, nil)
- testReadAt(t, readerAt, 4, 20, 16, io.EOF)
- testReadAt(t, readerAt, 4, 10, 10, nil)
- testReadAt(t, readerAt, 1, 10, 10, nil)
-
-}
diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go
deleted file mode 100644
index eaaecb740..000000000
--- a/weed/filer2/redis/redis_cluster_store.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package redis
-
-import (
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &RedisClusterStore{})
-}
-
-type RedisClusterStore struct {
- UniversalRedisStore
-}
-
-func (store *RedisClusterStore) GetName() string {
- return "redis_cluster"
-}
-
-func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
-
- configuration.SetDefault(prefix+"useReadOnly", true)
- configuration.SetDefault(prefix+"routeByLatency", true)
-
- return store.initialize(
- configuration.GetStringSlice(prefix+"addresses"),
- configuration.GetString(prefix+"password"),
- configuration.GetBool(prefix+"useReadOnly"),
- configuration.GetBool(prefix+"routeByLatency"),
- )
-}
-
-func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
- store.Client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: addresses,
- Password: password,
- ReadOnly: readOnly,
- RouteByLatency: routeByLatency,
- })
- return
-}
diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go
deleted file mode 100644
index 9debdb070..000000000
--- a/weed/filer2/redis/redis_store.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package redis
-
-import (
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &RedisStore{})
-}
-
-type RedisStore struct {
- UniversalRedisStore
-}
-
-func (store *RedisStore) GetName() string {
- return "redis"
-}
-
-func (store *RedisStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- return store.initialize(
- configuration.GetString(prefix+"address"),
- configuration.GetString(prefix+"password"),
- configuration.GetInt(prefix+"database"),
- )
-}
-
-func (store *RedisStore) initialize(hostPort string, password string, database int) (err error) {
- store.Client = redis.NewClient(&redis.Options{
- Addr: hostPort,
- Password: password,
- DB: database,
- })
- return
-}
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
deleted file mode 100644
index fc2abef6c..000000000
--- a/weed/filer2/redis/universal_redis_store.go
+++ /dev/null
@@ -1,191 +0,0 @@
-package redis
-
-import (
- "context"
- "fmt"
- "sort"
- "strings"
- "time"
-
- "github.com/go-redis/redis"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-const (
- DIR_LIST_MARKER = "\x00"
-)
-
-type UniversalRedisStore struct {
- Client redis.UniversalClient
-}
-
-func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
-
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- dir, name := entry.FullPath.DirAndName()
- if name != "" {
- _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result()
- if err != nil {
- return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
-
- data, err := store.Client.Get(string(fullpath)).Result()
- if err == redis.Nil {
- return nil, filer_pb.ErrNotFound
- }
-
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", fullpath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks([]byte(data))
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
-
- _, err = store.Client.Del(string(fullpath)).Result()
-
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- dir, name := fullpath.DirAndName()
- if name != "" {
- _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
- if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
-
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
- if err != nil {
- return fmt.Errorf("delete folder %s : %v", fullpath, err)
- }
-
- for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
- if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- return nil, filer2.ErrUnsupportedListDirectoryPrefixed
-}
-
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- dirListKey := genDirectoryListKey(string(fullpath))
- members, err := store.Client.SMembers(dirListKey).Result()
- if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
- }
-
- // skip
- if startFileName != "" {
- var t []string
- for _, m := range members {
- if strings.Compare(m, startFileName) >= 0 {
- if m == startFileName {
- if inclusive {
- t = append(t, m)
- }
- } else {
- t = append(t, m)
- }
- }
- }
- members = t
- }
-
- // sort
- sort.Slice(members, func(i, j int) bool {
- return strings.Compare(members[i], members[j]) < 0
- })
-
- // limit
- if limit < len(members) {
- members = members[:limit]
- }
-
- // fetch entry meta
- for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
- entry, err := store.FindEntry(ctx, path)
- if err != nil {
- glog.V(0).Infof("list %s : %v", path, err)
- } else {
- if entry.TtlSec > 0 {
- if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- store.Client.Del(string(path)).Result()
- store.Client.SRem(dirListKey, fileName).Result()
- continue
- }
- }
- entries = append(entries, entry)
- }
- }
-
- return entries, err
-}
-
-func genDirectoryListKey(dir string) (dirList string) {
- return dir + DIR_LIST_MARKER
-}
-
-func (store *UniversalRedisStore) Shutdown() {
- store.Client.Close()
-}
diff --git a/weed/filer2/redis2/redis_cluster_store.go b/weed/filer2/redis2/redis_cluster_store.go
deleted file mode 100644
index b252eabab..000000000
--- a/weed/filer2/redis2/redis_cluster_store.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package redis2
-
-import (
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &RedisCluster2Store{})
-}
-
-type RedisCluster2Store struct {
- UniversalRedis2Store
-}
-
-func (store *RedisCluster2Store) GetName() string {
- return "redis_cluster2"
-}
-
-func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
-
- configuration.SetDefault(prefix+"useReadOnly", true)
- configuration.SetDefault(prefix+"routeByLatency", true)
-
- return store.initialize(
- configuration.GetStringSlice(prefix+"addresses"),
- configuration.GetString(prefix+"password"),
- configuration.GetBool(prefix+"useReadOnly"),
- configuration.GetBool(prefix+"routeByLatency"),
- )
-}
-
-func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
- store.Client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: addresses,
- Password: password,
- ReadOnly: readOnly,
- RouteByLatency: routeByLatency,
- })
- return
-}
diff --git a/weed/filer2/redis2/redis_store.go b/weed/filer2/redis2/redis_store.go
deleted file mode 100644
index 1e2a20043..000000000
--- a/weed/filer2/redis2/redis_store.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package redis2
-
-import (
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &Redis2Store{})
-}
-
-type Redis2Store struct {
- UniversalRedis2Store
-}
-
-func (store *Redis2Store) GetName() string {
- return "redis2"
-}
-
-func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
- return store.initialize(
- configuration.GetString(prefix+"address"),
- configuration.GetString(prefix+"password"),
- configuration.GetInt(prefix+"database"),
- )
-}
-
-func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
- store.Client = redis.NewClient(&redis.Options{
- Addr: hostPort,
- Password: password,
- DB: database,
- })
- return
-}
diff --git a/weed/filer2/redis2/universal_redis_store.go b/weed/filer2/redis2/universal_redis_store.go
deleted file mode 100644
index c639635ef..000000000
--- a/weed/filer2/redis2/universal_redis_store.go
+++ /dev/null
@@ -1,166 +0,0 @@
-package redis2
-
-import (
- "context"
- "fmt"
- "time"
-
- "github.com/go-redis/redis"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-const (
- DIR_LIST_MARKER = "\x00"
-)
-
-type UniversalRedis2Store struct {
- Client redis.UniversalClient
-}
-
-func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
-
- if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
-
- dir, name := entry.FullPath.DirAndName()
- if name != "" {
- if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
- return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
-
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
-
- data, err := store.Client.Get(string(fullpath)).Result()
- if err == redis.Nil {
- return nil, filer_pb.ErrNotFound
- }
-
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", fullpath, err)
- }
-
- entry = &filer2.Entry{
- FullPath: fullpath,
- }
- err = entry.DecodeAttributesAndChunks([]byte(data))
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
-
- return entry, nil
-}
-
-func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
-
- _, err = store.Client.Del(string(fullpath)).Result()
-
- if err != nil {
- return fmt.Errorf("delete %s : %v", fullpath, err)
- }
-
- dir, name := fullpath.DirAndName()
- if name != "" {
- _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
- if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
-
- members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
- if err != nil {
- return fmt.Errorf("delete folder %s : %v", fullpath, err)
- }
-
- for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
- if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
- }
- }
-
- return nil
-}
-
-func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
- return nil, filer2.ErrUnsupportedListDirectoryPrefixed
-}
-
-func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer2.Entry, err error) {
-
- dirListKey := genDirectoryListKey(string(fullpath))
- start := int64(0)
- if startFileName != "" {
- start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
- if !inclusive {
- start++
- }
- }
- members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
- if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
- }
-
- // fetch entry meta
- for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
- entry, err := store.FindEntry(ctx, path)
- if err != nil {
- glog.V(0).Infof("list %s : %v", path, err)
- } else {
- if entry.TtlSec > 0 {
- if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- store.Client.Del(string(path)).Result()
- store.Client.ZRem(dirListKey, fileName).Result()
- continue
- }
- }
- entries = append(entries, entry)
- }
- }
-
- return entries, err
-}
-
-func genDirectoryListKey(dir string) (dirList string) {
- return dir + DIR_LIST_MARKER
-}
-
-func (store *UniversalRedis2Store) Shutdown() {
- store.Client.Close()
-}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
deleted file mode 100644
index fee9d45da..000000000
--- a/weed/filer2/stream.go
+++ /dev/null
@@ -1,204 +0,0 @@
-package filer2
-
-import (
- "bytes"
- "io"
- "math"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-)
-
-func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
-
- // fmt.Printf("start to stream content for chunks: %+v\n", chunks)
- chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
-
- fileId2Url := make(map[string]string)
-
- for _, chunkView := range chunkViews {
-
- urlString, err := masterClient.LookupFileId(chunkView.FileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- fileId2Url[chunkView.FileId] = urlString
- }
-
- for _, chunkView := range chunkViews {
-
- urlString := fileId2Url[chunkView.FileId]
- err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- }
-
- return nil
-
-}
-
-// ---------------- ReadAllReader ----------------------------------
-
-func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
-
- buffer := bytes.Buffer{}
-
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
- return masterClient.LookupFileId(fileId)
- }
-
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
-
- for _, chunkView := range chunkViews {
- urlString, err := lookupFileIdFn(chunkView.FileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return nil, err
- }
- err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return nil, err
- }
- }
- return buffer.Bytes(), nil
-}
-
-// ---------------- ChunkStreamReader ----------------------------------
-type ChunkStreamReader struct {
- chunkViews []*ChunkView
- logicOffset int64
- buffer []byte
- bufferOffset int64
- bufferPos int
- chunkIndex int
- lookupFileId LookupFileIdFunctionType
-}
-
-var _ = io.ReadSeeker(&ChunkStreamReader{})
-
-func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
-
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
- return masterClient.LookupFileId(fileId)
- }
-
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
-
- return &ChunkStreamReader{
- chunkViews: chunkViews,
- lookupFileId: lookupFileIdFn,
- }
-}
-
-func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
-
- lookupFileIdFn := LookupFn(filerClient)
-
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
-
- return &ChunkStreamReader{
- chunkViews: chunkViews,
- lookupFileId: lookupFileIdFn,
- }
-}
-
-func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
- for n < len(p) {
- if c.isBufferEmpty() {
- if c.chunkIndex >= len(c.chunkViews) {
- return n, io.EOF
- }
- chunkView := c.chunkViews[c.chunkIndex]
- c.fetchChunkToBuffer(chunkView)
- c.chunkIndex++
- }
- t := copy(p[n:], c.buffer[c.bufferPos:])
- c.bufferPos += t
- n += t
- }
- return
-}
-
-func (c *ChunkStreamReader) isBufferEmpty() bool {
- return len(c.buffer) <= c.bufferPos
-}
-
-func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
-
- var totalSize int64
- for _, chunk := range c.chunkViews {
- totalSize += int64(chunk.Size)
- }
-
- var err error
- switch whence {
- case io.SeekStart:
- case io.SeekCurrent:
- offset += c.bufferOffset + int64(c.bufferPos)
- case io.SeekEnd:
- offset = totalSize + offset
- }
- if offset > totalSize {
- err = io.ErrUnexpectedEOF
- }
-
- for i, chunk := range c.chunkViews {
- if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
- if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
- c.fetchChunkToBuffer(chunk)
- c.chunkIndex = i + 1
- break
- }
- }
- }
- c.bufferPos = int(offset - c.bufferOffset)
-
- return offset, err
-
-}
-
-func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
- urlString, err := c.lookupFileId(chunkView.FileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- c.buffer = buffer.Bytes()
- c.bufferPos = 0
- c.bufferOffset = chunkView.LogicOffset
-
- // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
-
- return nil
-}
-
-func (c *ChunkStreamReader) Close() {
- // TODO try to release and reuse buffer
-}
-
-func VolumeId(fileId string) string {
- lastCommaIndex := strings.LastIndex(fileId, ",")
- if lastCommaIndex > 0 {
- return fileId[:lastCommaIndex]
- }
- return fileId
-}
diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go
deleted file mode 100644
index 9c6e5c88d..000000000
--- a/weed/filer2/topics.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package filer2
-
-const (
- TopicsDir = "/topics"
- SystemLogDir = TopicsDir + "/.system/log"
-)