aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go30
-rw-r--r--weed/filer2/cassandra/cassandra_store.go11
-rw-r--r--weed/filer2/entry.go2
-rw-r--r--weed/filer2/entry_codec.go27
-rw-r--r--weed/filer2/etcd/etcd_store.go196
-rw-r--r--weed/filer2/filer.go72
-rw-r--r--weed/filer2/filer_client_util.go57
-rw-r--r--weed/filer2/filer_delete_entry.go102
-rw-r--r--weed/filer2/filer_deletion.go9
-rw-r--r--weed/filer2/filerstore.go14
-rw-r--r--weed/filer2/leveldb/leveldb_store.go35
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go41
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go4
-rw-r--r--weed/filer2/memdb/memdb_store.go132
-rw-r--r--weed/filer2/memdb/memdb_store_test.go149
-rw-r--r--weed/filer2/mysql/mysql_store.go9
-rw-r--r--weed/filer2/postgres/postgres_store.go1
-rw-r--r--weed/filer2/redis/redis_cluster_store.go14
-rw-r--r--weed/filer2/redis/universal_redis_store.go18
-rw-r--r--weed/filer2/tikv/tikv_store.go251
-rw-r--r--weed/filer2/tikv/tikv_store_unsupported.go65
21 files changed, 840 insertions, 399 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index 3e8554957..d512467c7 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -10,13 +10,14 @@ import (
)
type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlListExclusive string
- SqlListInclusive string
+ DB *sql.DB
+ SqlInsert string
+ SqlUpdate string
+ SqlFind string
+ SqlDelete string
+ SqlDeleteFolderChildren string
+ SqlListExclusive string
+ SqlListInclusive string
}
type TxOrDB interface {
@@ -132,6 +133,21 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
return nil
}
+func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, hashToLong(string(fullpath)), fullpath)
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index 466be5bf3..dcaab8bc4 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -112,6 +112,17 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu
return nil
}
+func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=?",
+ fullpath).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index 3f8a19114..c901927bb 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -30,6 +30,7 @@ type Entry struct {
FullPath
Attr
+ Extended map[string][]byte
// the following is for files
Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`
@@ -56,6 +57,7 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
IsDirectory: entry.IsDirectory(),
Attributes: EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
}
}
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
index cf4627b74..3a2dc6134 100644
--- a/weed/filer2/entry_codec.go
+++ b/weed/filer2/entry_codec.go
@@ -1,18 +1,21 @@
package filer2
import (
+ "bytes"
+ "fmt"
"os"
"time"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
message := &filer_pb.Entry{
Attributes: EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
}
return proto.Marshal(message)
}
@@ -27,6 +30,8 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
entry.Attr = PbToEntryAttribute(message.Attributes)
+ entry.Extended = message.Extended
+
entry.Chunks = message.Chunks
return nil
@@ -84,6 +89,10 @@ func EqualEntry(a, b *Entry) bool {
return false
}
+ if !eq(a.Extended, b.Extended) {
+ return false
+ }
+
for i := 0; i < len(a.Chunks); i++ {
if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
return false
@@ -91,3 +100,17 @@ func EqualEntry(a, b *Entry) bool {
}
return true
}
+
+func eq(a, b map[string][]byte) bool {
+ if len(a) != len(b) {
+ return false
+ }
+
+ for k, v := range a {
+ if w, ok := b[k]; !ok || !bytes.Equal(v, w) {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
new file mode 100644
index 000000000..2eb9e3e86
--- /dev/null
+++ b/weed/filer2/etcd/etcd_store.go
@@ -0,0 +1,196 @@
+package etcd
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+ "go.etcd.io/etcd/clientv3"
+)
+
+const (
+ DIR_FILE_SEPARATOR = byte(0x00)
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &EtcdStore{})
+}
+
+type EtcdStore struct {
+ client *clientv3.Client
+}
+
+func (store *EtcdStore) GetName() string {
+ return "etcd"
+}
+
+func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) {
+ servers := configuration.GetString("servers")
+ if servers == "" {
+ servers = "localhost:2379"
+ }
+
+ timeout := configuration.GetString("timeout")
+ if timeout == "" {
+ timeout = "3s"
+ }
+
+ return store.initialize(servers, timeout)
+}
+
+func (store *EtcdStore) initialize(servers string, timeout string) (err error) {
+ glog.Infof("filer store etcd: %s", servers)
+
+ to, err := time.ParseDuration(timeout)
+ if err != nil {
+ return fmt.Errorf("parse timeout %s: %s", timeout, err)
+ }
+
+ store.client, err = clientv3.New(clientv3.Config{
+ Endpoints: strings.Split(servers, ","),
+ DialTimeout: to,
+ })
+ if err != nil {
+ return fmt.Errorf("connect to etcd %s: %s", servers, err)
+ }
+
+ return
+}
+
+func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *EtcdStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ key := genKey(entry.DirAndName())
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+ key := genKey(fullpath.DirAndName())
+
+ resp, err := store.client.Get(ctx, string(key))
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ if len(resp.Kvs) == 0 {
+ return nil, filer2.ErrNotFound
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ key := genKey(fullpath.DirAndName())
+
+ if _, err := store.client.Delete(ctx, string(key)); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
+ return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) ListDirectoryEntries(
+ ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
+) (entries []*filer2.Entry, err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ resp, err := store.client.Get(ctx, string(directoryPrefix),
+ clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+
+ for _, kv := range resp.Kvs {
+ fileName := getNameFromKey(kv.Key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer2.Entry{
+ FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = []byte(dirPath)
+ key = append(key, DIR_FILE_SEPARATOR)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = []byte(string(fullpath))
+ keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+ sepIndex := len(key) - 1
+ for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
+ sepIndex--
+ }
+
+ return string(key[sepIndex+1:])
+}
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index cf236b74d..b724e20fd 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -3,18 +3,21 @@ package filer2
import (
"context"
"fmt"
- "google.golang.org/grpc"
- "math"
"os"
"path/filepath"
"strings"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/karlseguin/ccache"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/karlseguin/ccache"
)
+const PaginationSize = 1024 * 256
+
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
@@ -32,7 +35,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
- fileIdDeletionChan: make(chan string, 4096),
+ fileIdDeletionChan: make(chan string, PaginationSize),
GrpcDialOption: grpcDialOption,
}
@@ -203,67 +206,6 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
return f.store.FindEntry(ctx, p)
}
-func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) {
- entry, err := f.FindEntry(ctx, p)
- if err != nil {
- return err
- }
-
- if entry.IsDirectory() {
- limit := int(1)
- if isRecursive {
- limit = math.MaxInt32
- }
- lastFileName := ""
- includeLastFile := false
- for limit > 0 {
- entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024)
- if err != nil {
- glog.Errorf("list folder %s: %v", p, err)
- return fmt.Errorf("list folder %s: %v", p, err)
- }
-
- if len(entries) == 0 {
- break
- }
-
- if isRecursive {
- for _, sub := range entries {
- lastFileName = sub.Name()
- err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, shouldDeleteChunks)
- if err != nil {
- return err
- }
- limit--
- if limit <= 0 {
- break
- }
- }
- }
-
- if len(entries) < 1024 {
- break
- }
- }
-
- f.cacheDelDirectory(string(p))
-
- }
-
- if shouldDeleteChunks {
- f.DeleteChunks(p, entry.Chunks)
- }
-
- if p == "/" {
- return nil
- }
- glog.V(3).Infof("deleting entry %v", p)
-
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
-
- return f.store.DeleteEntry(ctx, p)
-}
-
func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
index 7e093eea2..1a10f7c20 100644
--- a/weed/filer2/filer_client_util.go
+++ b/weed/filer2/filer_client_util.go
@@ -3,6 +3,8 @@ package filer2
import (
"context"
"fmt"
+ "io"
+ "math"
"strings"
"sync"
@@ -112,47 +114,54 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string)
return err
}
- if resp.Entry != nil {
- entry = resp.Entry
+ if resp.Entry == nil {
+ glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
+ return nil
}
+ entry = resp.Entry
return nil
})
return
}
-func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) {
+func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- paginationLimit := 1024
-
lastEntryName := ""
- for {
-
- request := &filer_pb.ListEntriesRequest{
- Directory: fullDirPath,
- StartFromFileName: lastEntryName,
- Limit: uint32(paginationLimit),
- }
+ request := &filer_pb.ListEntriesRequest{
+ Directory: fullDirPath,
+ Prefix: prefix,
+ StartFromFileName: lastEntryName,
+ Limit: math.MaxUint32,
+ }
- glog.V(3).Infof("read directory: %v", request)
- resp, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
+ glog.V(3).Infof("read directory: %v", request)
+ stream, err := client.ListEntries(ctx, request)
+ if err != nil {
+ return fmt.Errorf("list %s: %v", fullDirPath, err)
+ }
- for _, entry := range resp.Entries {
- fn(entry)
- lastEntryName = entry.Name
+ var prevEntry *filer_pb.Entry
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ if prevEntry != nil {
+ fn(prevEntry, true)
+ }
+ break
+ } else {
+ return recvErr
+ }
}
-
- if len(resp.Entries) < paginationLimit {
- break
+ if prevEntry != nil {
+ fn(prevEntry, false)
}
-
+ prevEntry = resp.Entry
}
return nil
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
new file mode 100644
index 000000000..75a09e7ef
--- /dev/null
+++ b/weed/filer2/filer_delete_entry.go
@@ -0,0 +1,102 @@
+package filer2
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
+ if p == "/" {
+ return nil
+ }
+
+ entry, findErr := f.FindEntry(ctx, p)
+ if findErr != nil {
+ return findErr
+ }
+
+ var chunks []*filer_pb.FileChunk
+ chunks = append(chunks, entry.Chunks...)
+ if entry.IsDirectory() {
+ // delete the folder children, not including the folder itself
+ var dirChunks []*filer_pb.FileChunk
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ if err != nil {
+ return fmt.Errorf("delete directory %s: %v", p, err)
+ }
+ chunks = append(chunks, dirChunks...)
+ f.cacheDelDirectory(string(p))
+ }
+ // delete the file or folder
+ err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
+ if err != nil {
+ return fmt.Errorf("delete file %s: %v", p, err)
+ }
+
+ if shouldDeleteChunks {
+ go f.DeleteChunks(chunks)
+ }
+
+ return nil
+}
+
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
+
+ lastFileName := ""
+ includeLastFile := false
+ for {
+ entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
+ if err != nil {
+ glog.Errorf("list folder %s: %v", entry.FullPath, err)
+ return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ }
+ if lastFileName == "" && !isRecursive && len(entries) > 0 {
+ // only for first iteration in the loop
+ return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ }
+
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ var dirChunks []*filer_pb.FileChunk
+ if sub.IsDirectory() {
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ }
+ if err != nil && !ignoreRecursiveError {
+ return nil, err
+ }
+ if shouldDeleteChunks {
+ chunks = append(chunks, dirChunks...)
+ }
+ }
+
+ if len(entries) < PaginationSize {
+ break
+ }
+ }
+
+ f.cacheDelDirectory(string(entry.FullPath))
+
+ glog.V(3).Infof("deleting directory %v", entry.FullPath)
+
+ if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
+ return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return chunks, nil
+}
+
+func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
+
+ glog.V(3).Infof("deleting entry %v", entry.FullPath)
+
+ if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return nil
+}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index fea93d57f..9937685f7 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -15,7 +15,7 @@ func (f *Filer) loopProcessingDeletion() {
lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
for _, vid := range vids {
- locs := f.MasterClient.GetVidLocations(vid)
+ locs, _ := f.MasterClient.GetVidLocations(vid)
var locations []operation.Location
for _, loc := range locs {
locations = append(locations, operation.Location{
@@ -51,9 +51,8 @@ func (f *Filer) loopProcessingDeletion() {
}
}
-func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
+func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String())
f.fileIdDeletionChan <- chunk.GetFileIdString()
}
}
@@ -70,7 +69,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
return
}
if newEntry == nil {
- f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks)
+ f.DeleteChunks(oldEntry.Chunks)
}
var toDelete []*filer_pb.FileChunk
@@ -84,5 +83,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(oldEntry.FullPath, toDelete)
+ f.DeleteChunks(toDelete)
}
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index 231c7fc68..0bb0bd611 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -20,6 +20,7 @@ type FilerStore interface {
// err == filer2.ErrNotFound if not found
FindEntry(context.Context, FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, FullPath) (err error)
+ DeleteFolderChildren(context.Context, FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
BeginTransaction(ctx context.Context) (context.Context, error)
@@ -34,6 +35,9 @@ type FilerStoreWrapper struct {
}
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
+ if innerStore, ok := store.(*FilerStoreWrapper); ok {
+ return innerStore
+ }
return &FilerStoreWrapper{
actualStore: store,
}
@@ -94,6 +98,16 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err
return fsw.actualStore.DeleteEntry(ctx, fp)
}
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
+ }()
+
+ return fsw.actualStore.DeleteFolderChildren(ctx, fp)
+}
+
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
start := time.Now()
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index d00eba859..4952b3b3a 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -5,12 +5,13 @@ import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -123,6 +124,34 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full
return nil
}
+func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ batch := new(leveldb.Batch)
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete([]byte(genKey(string(fullpath), fileName)))
+ }
+ iter.Release()
+
+ err = store.db.Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
index bce81e357..8a16822ab 100644
--- a/weed/filer2/leveldb2/leveldb2_store.go
+++ b/weed/filer2/leveldb2/leveldb2_store.go
@@ -8,12 +8,13 @@ import (
"io"
"os"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -21,7 +22,7 @@ func init() {
}
type LevelDB2Store struct {
- dbs []*leveldb.DB
+ dbs []*leveldb.DB
dbCount int
}
@@ -46,7 +47,7 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
CompactionTableSizeMultiplier: 4,
}
- for d := 0 ; d < dbCount; d++ {
+ for d := 0; d < dbCount; d++ {
dbFolder := fmt.Sprintf("%s/%02d", dir, d)
os.MkdirAll(dbFolder, 0755)
db, dbErr := leveldb.OpenFile(dbFolder, opts)
@@ -134,6 +135,34 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful
return nil
}
+func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
+
+ batch := new(leveldb.Batch)
+
+ iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete(append(directoryPrefix, []byte(fileName)...))
+ }
+ iter.Release()
+
+ err = store.dbs[partitionId].Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
@@ -204,5 +233,5 @@ func hashToBytes(dir string, dbCount int) ([]byte, int) {
x := b[len(b)-1]
- return b, int(x)%dbCount
+ return b, int(x) % dbCount
}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index a16803ca1..e28ef7dac 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -13,7 +13,7 @@ func TestCreateAndFind(t *testing.T) {
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
- store.initialize(dir,2)
+ store.initialize(dir, 2)
filer.SetStore(store)
filer.DisableDirectoryCache()
@@ -68,7 +68,7 @@ func TestEmptyRoot(t *testing.T) {
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
- store.initialize(dir,2)
+ store.initialize(dir, 2)
filer.SetStore(store)
filer.DisableDirectoryCache()
diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go
deleted file mode 100644
index 9c10a5472..000000000
--- a/weed/filer2/memdb/memdb_store.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package memdb
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/google/btree"
- "strings"
- "sync"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &MemDbStore{})
-}
-
-type MemDbStore struct {
- tree *btree.BTree
- treeLock sync.Mutex
-}
-
-type entryItem struct {
- *filer2.Entry
-}
-
-func (a entryItem) Less(b btree.Item) bool {
- return strings.Compare(string(a.FullPath), string(b.(entryItem).FullPath)) < 0
-}
-
-func (store *MemDbStore) GetName() string {
- return "memory"
-}
-
-func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) {
- store.tree = btree.New(8)
- return nil
-}
-
-func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *MemDbStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *MemDbStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- // println("inserting", entry.FullPath)
- store.treeLock.Lock()
- store.tree.ReplaceOrInsert(entryItem{entry})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- if _, err = store.FindEntry(ctx, entry.FullPath); err != nil {
- return fmt.Errorf("no such file %s : %v", entry.FullPath, err)
- }
- store.treeLock.Lock()
- store.tree.ReplaceOrInsert(entryItem{entry})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}})
- if item == nil {
- return nil, filer2.ErrNotFound
- }
- entry = item.(entryItem).Entry
- return entry, nil
-}
-
-func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- store.treeLock.Lock()
- store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
-
- startFrom := string(fullpath)
- if startFileName != "" {
- startFrom = startFrom + "/" + startFileName
- }
-
- store.tree.AscendGreaterOrEqual(entryItem{&filer2.Entry{FullPath: filer2.FullPath(startFrom)}},
- func(item btree.Item) bool {
- if limit <= 0 {
- return false
- }
- entry := item.(entryItem).Entry
- // println("checking", entry.FullPath)
-
- if entry.FullPath == fullpath {
- // skipping the current directory
- // println("skipping the folder", entry.FullPath)
- return true
- }
-
- dir, name := entry.FullPath.DirAndName()
- if name == startFileName {
- if inclusive {
- limit--
- entries = append(entries, entry)
- }
- return true
- }
-
- // only iterate the same prefix
- if !strings.HasPrefix(string(entry.FullPath), string(fullpath)) {
- // println("breaking from", entry.FullPath)
- return false
- }
-
- if dir != string(fullpath) {
- // this could be items in deeper directories
- // println("skipping deeper folder", entry.FullPath)
- return true
- }
- // now process the directory items
- // println("adding entry", entry.FullPath)
- limit--
- entries = append(entries, entry)
- return true
- },
- )
- return entries, nil
-}
diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go
deleted file mode 100644
index d823c5177..000000000
--- a/weed/filer2/memdb/memdb_store_test.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package memdb
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "testing"
-)
-
-func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- store := &MemDbStore{}
- store.Initialize(nil)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
-
- entry1 := &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- if err := filer.CreateEntry(ctx, entry1); err != nil {
- t.Errorf("create entry %v: %v", entry1.FullPath, err)
- return
- }
-
- entry, err := filer.FindEntry(ctx, fullpath)
-
- if err != nil {
- t.Errorf("find entry: %v", err)
- return
- }
-
- if entry.FullPath != entry1.FullPath {
- t.Errorf("find wrong entry: %v", entry.FullPath)
- return
- }
-
-}
-
-func TestCreateFileAndList(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- store := &MemDbStore{}
- store.Initialize(nil)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- entry1 := &filer2.Entry{
- FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"),
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- entry2 := &filer2.Entry{
- FullPath: filer2.FullPath("/home/chris/this/is/one/file2.jpg"),
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- filer.CreateEntry(ctx, entry1)
- filer.CreateEntry(ctx, entry2)
-
- // checking the 2 files
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100)
-
- if err != nil {
- t.Errorf("list entries: %v", err)
- return
- }
-
- if len(entries) != 2 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- if entries[0].FullPath != entry1.FullPath {
- t.Errorf("find wrong entry 1: %v", entries[0].FullPath)
- return
- }
-
- if entries[1].FullPath != entry2.FullPath {
- t.Errorf("find wrong entry 2: %v", entries[1].FullPath)
- return
- }
-
- // checking the offset
- entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking root directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // add file3
- file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg")
- entry3 := &filer2.Entry{
- FullPath: file3Path,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
- filer.CreateEntry(ctx, entry3)
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 2 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // delete file and count
- filer.DeleteEntryMetaAndData(ctx, file3Path, false, false)
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go
index e18299bd2..d1b06ece5 100644
--- a/weed/filer2/mysql/mysql_store.go
+++ b/weed/filer2/mysql/mysql_store.go
@@ -35,19 +35,26 @@ func (store *MysqlStore) Initialize(configuration util.Configuration) (err error
configuration.GetString("database"),
configuration.GetInt("connection_max_idle"),
configuration.GetInt("connection_max_open"),
+ configuration.GetBool("interpolateParams"),
)
}
-func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int) (err error) {
+func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
+ interpolateParams bool) (err error) {
store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?"
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ if interpolateParams {
+ sqlUrl += "&interpolateParams=true"
+ }
+
var dbErr error
store.DB, dbErr = sql.Open("mysql", sqlUrl)
if dbErr != nil {
diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go
index ffd3d1e01..3ec000fe0 100644
--- a/weed/filer2/postgres/postgres_store.go
+++ b/weed/filer2/postgres/postgres_store.go
@@ -45,6 +45,7 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go
index 11c315391..f1ad4b35c 100644
--- a/weed/filer2/redis/redis_cluster_store.go
+++ b/weed/filer2/redis/redis_cluster_store.go
@@ -19,16 +19,24 @@ func (store *RedisClusterStore) GetName() string {
}
func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) {
+
+ configuration.SetDefault("useReadOnly", true)
+ configuration.SetDefault("routeByLatency", true)
+
return store.initialize(
configuration.GetStringSlice("addresses"),
configuration.GetString("password"),
+ configuration.GetBool("useReadOnly"),
+ configuration.GetBool("routeByLatency"),
)
}
-func (store *RedisClusterStore) initialize(addresses []string, password string) (err error) {
+func (store *RedisClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: addresses,
- Password: password,
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
})
return
}
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index ce41d4d70..62257e91e 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -99,6 +99,24 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file
return nil
}
+func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ if err != nil {
+ return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ }
+
+ for _, fileName := range members {
+ path := filer2.NewFullPath(string(fullpath), fileName)
+ _, err = store.Client.Del(string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go
new file mode 100644
index 000000000..4eb8cb90d
--- /dev/null
+++ b/weed/filer2/tikv/tikv_store.go
@@ -0,0 +1,251 @@
+// +build !386
+// +build !arm
+
+package tikv
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+
+ "github.com/pingcap/tidb/kv"
+ "github.com/pingcap/tidb/store/tikv"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &TikvStore{})
+}
+
+type TikvStore struct {
+ store kv.Storage
+}
+
+func (store *TikvStore) GetName() string {
+ return "tikv"
+}
+
+func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) {
+ pdAddr := configuration.GetString("pdAddress")
+ return store.initialize(pdAddr)
+}
+
+func (store *TikvStore) initialize(pdAddr string) (err error) {
+ glog.Infof("filer store tikv pd address: %s", pdAddr)
+
+ driver := tikv.Driver{}
+
+ store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr))
+
+ if err != nil {
+ return fmt.Errorf("open tikv %s : %v", pdAddr, err)
+ }
+
+ return
+}
+
+func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.store.Begin()
+ if err != nil {
+ return ctx, err
+ }
+ return context.WithValue(ctx, "tx", tx), nil
+}
+func (store *TikvStore) CommitTransaction(ctx context.Context) error {
+ tx, ok := ctx.Value("tx").(kv.Transaction)
+ if ok {
+ return tx.Commit(ctx)
+ }
+ return nil
+}
+func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
+ tx, ok := ctx.Value("tx").(kv.Transaction)
+ if ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+func (store *TikvStore) getTx(ctx context.Context) kv.Transaction {
+ if tx, ok := ctx.Value("tx").(kv.Transaction); ok {
+ return tx
+ }
+ return nil
+}
+
+func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ dir, name := entry.DirAndName()
+ key := genKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.getTx(ctx).Set(key, value)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+
+ data, err := store.getTx(ctx).Get(ctx, key)
+
+ if err == kv.ErrNotExist {
+ return nil, filer2.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+
+ err = store.getTx(ctx).Delete(key)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ tx := store.getTx(ctx)
+
+ iter, err := tx.Iter(directoryPrefix, nil)
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ iter.Next()
+ continue
+ }
+
+ if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ iter.Next()
+ }
+
+ return nil
+}
+
+func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer2.Entry, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+ lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName)
+
+ iter, err := store.getTx(ctx).Iter(lastFileStart, nil)
+ if err != nil {
+ return nil, fmt.Errorf("list %s: %v", fullpath, err)
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ iter.Next()
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ iter.Next()
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer2.Entry{
+ FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ }
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+
+ if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ iter.Next()
+ }
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ return b
+}
diff --git a/weed/filer2/tikv/tikv_store_unsupported.go b/weed/filer2/tikv/tikv_store_unsupported.go
new file mode 100644
index 000000000..36de2d974
--- /dev/null
+++ b/weed/filer2/tikv/tikv_store_unsupported.go
@@ -0,0 +1,65 @@
+// +build 386 arm
+
+package tikv
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &TikvStore{})
+}
+
+type TikvStore struct {
+}
+
+func (store *TikvStore) GetName() string {
+ return "tikv"
+}
+
+func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) initialize(pdAddr string) (err error) {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return nil, fmt.Errorf("not implemented for 32 bit computers")
+}
+func (store *TikvStore) CommitTransaction(ctx context.Context) error {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+ return nil, fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ return fmt.Errorf("not implemented for 32 bit computers")
+}
+
+func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer2.Entry, err error) {
+ return nil, fmt.Errorf("not implemented for 32 bit computers")
+}