aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--go.mod2
-rw-r--r--go.sum27
-rw-r--r--weed/command/imports.go1
-rw-r--r--weed/command/scaffold/filer.toml10
-rw-r--r--weed/filer/arangodb/arangodb_store.go347
-rw-r--r--weed/filer/arangodb/arangodb_store_bucket.go40
-rw-r--r--weed/filer/arangodb/arangodb_store_kv.go54
-rw-r--r--weed/filer/arangodb/helpers.go136
-rw-r--r--weed/filer/arangodb/readme.md52
-rw-r--r--weed/filer/filer_delete_entry.go2
-rw-r--r--weed/server/filer_server.go1
12 files changed, 672 insertions, 2 deletions
diff --git a/.gitignore b/.gitignore
index 671b01051..e7ae92784 100644
--- a/.gitignore
+++ b/.gitignore
@@ -75,6 +75,8 @@ com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
+workspace/
+
test_data
build
target
diff --git a/go.mod b/go.mod
index 471d9b6ca..06e686e17 100644
--- a/go.mod
+++ b/go.mod
@@ -163,6 +163,8 @@ require (
cloud.google.com/go/compute v1.5.0 // indirect
cloud.google.com/go/iam v0.1.1 // indirect
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 // indirect
+ github.com/arangodb/go-driver v1.2.1 // indirect
+ github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e // indirect
github.com/aws/aws-sdk-go-v2 v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.4.0 // indirect
diff --git a/go.sum b/go.sum
index 197e73b75..15350f903 100644
--- a/go.sum
+++ b/go.sum
@@ -178,6 +178,15 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
+<<<<<<< HEAD
+github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
+github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
+github.com/arangodb/go-driver v1.2.1 h1:HREDHhDmzdIWxHmfkfTESbYUnRjESjPh4WUuXq7FZa8=
+github.com/arangodb/go-driver v1.2.1/go.mod h1:zdDkJJnCj8DAkfbtIjIXnsTrWIiy6VhP3Vy14p+uQeY=
+github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g=
+github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho=
+=======
+>>>>>>> 93615b2a49b6510e6108a784b54be7d16719d730
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
@@ -261,6 +270,7 @@ github.com/colinmarc/hdfs/v2 v2.3.0 h1:tMxOjXn6+7iPUlxAyup9Ha2hnmLe3Sv5DM2qqbSQ2
github.com/colinmarc/hdfs/v2 v2.3.0/go.mod h1:nsyY1uyQOomU34KVQk9Qb/lDJobN1MQ/9WS6IqcVZno=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-iptables v0.4.3/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -273,7 +283,11 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+<<<<<<< HEAD
+github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9/go.mod h1:GgB8SF9nRG+GqaDtLcwJZsQFhcogVCJ79j4EdT0c2V4=
+=======
github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
+>>>>>>> 93615b2a49b6510e6108a784b54be7d16719d730
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
@@ -379,11 +393,18 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+<<<<<<< HEAD
+github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
+github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
+github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
+github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
+=======
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
+>>>>>>> 93615b2a49b6510e6108a784b54be7d16719d730
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -772,8 +793,14 @@ github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qq
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+<<<<<<< HEAD
+github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
+github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
+github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+=======
github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd h1:CmH9+J6ZSsIjUK3dcGsnCnO41eRBOnY12zwkn5qVwgc=
github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk=
+>>>>>>> 93615b2a49b6510e6108a784b54be7d16719d730
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seaweedfs/goexif v2.0.0+incompatible h1:x8pckiT12QQhifwhDQpeISgDfsqmQ6VR4LFPQ64JRps=
diff --git a/weed/command/imports.go b/weed/command/imports.go
index 3792c45c4..5b3195907 100644
--- a/weed/command/imports.go
+++ b/weed/command/imports.go
@@ -15,6 +15,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 5d4513c36..6a7835de3 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -285,6 +285,16 @@ healthcheck_enabled = false
index.max_result_window = 10000
+[arangodb] # in development dont use it
+enabled = false
+db_name = "seaweedfs"
+servers=["http://localhost:8529"] # list of servers to connect to
+# only basic auth supported for now
+user=""
+pass=""
+# skip tls cert validation
+insecure_skip_verify = true
+
##########################
##########################
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go
new file mode 100644
index 000000000..27ed9132b
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store.go
@@ -0,0 +1,347 @@
+package arangodb
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/arangodb/go-driver"
+ "github.com/arangodb/go-driver/http"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &ArangodbStore{})
+}
+
+var (
+ BUCKET_PREFIX = "/buckets"
+ DEFAULT_COLLECTION = "seaweed_no_bucket"
+ KVMETA_COLLECTION = "seaweed_kvmeta"
+)
+
+type ArangodbStore struct {
+ connect driver.Connection
+ client driver.Client
+ database driver.Database
+ kvCollection driver.Collection
+
+ buckets map[string]driver.Collection
+ mu sync.RWMutex
+
+ databaseName string
+}
+
+type Model struct {
+ Key string `json:"_key"`
+ Directory string `json:"directory,omitempty"`
+ Name string `json:"name,omitempty"`
+ Ttl string `json:"ttl,omitempty"`
+
+ //arangodb does not support binary blobs
+ //we encode byte slice into uint64 slice
+ //see helpers.go
+ Meta []uint64 `json:"meta"`
+}
+
+func (store *ArangodbStore) GetName() string {
+ return "arangodb"
+}
+
+func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ store.buckets = make(map[string]driver.Collection, 3)
+ store.databaseName = configuration.GetString(prefix + "db_name")
+ return store.connection(configuration.GetStringSlice(prefix+"servers"),
+ configuration.GetString(prefix+"user"),
+ configuration.GetString(prefix+"pass"),
+ configuration.GetBool(prefix+"insecure_skip_verify"),
+ )
+}
+
+func (store *ArangodbStore) connection(uris []string, user string, pass string, insecure bool) (err error) {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+
+ store.connect, err = http.NewConnection(http.ConnectionConfig{
+ Endpoints: uris,
+ TLSConfig: &tls.Config{
+ InsecureSkipVerify: insecure,
+ },
+ })
+ if err != nil {
+ return err
+ }
+ store.client, err = driver.NewClient(driver.ClientConfig{
+ Connection: store.connect,
+ Authentication: driver.BasicAuthentication(user, pass),
+ })
+ if err != nil {
+ return err
+ }
+ ok, err := store.client.DatabaseExists(ctx, store.databaseName)
+ if err != nil {
+ return err
+ }
+ if ok {
+ store.database, err = store.client.Database(ctx, store.databaseName)
+ } else {
+ store.database, err = store.client.CreateDatabase(ctx, store.databaseName, &driver.CreateDatabaseOptions{})
+ }
+ if err != nil {
+ return err
+ }
+ if store.kvCollection, err = store.ensureCollection(ctx, KVMETA_COLLECTION); err != nil {
+ return err
+ }
+ return err
+}
+
+type key int
+
+const (
+ transactionKey key = 0
+)
+
+func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ keys := make([]string, 0, len(store.buckets)+1)
+ for k := range store.buckets {
+ keys = append(keys, k)
+ }
+ keys = append(keys, store.kvCollection.Name())
+ txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{
+ Exclusive: keys,
+ }, &driver.BeginTransactionOptions{})
+ if err != nil {
+ return nil, err
+ }
+
+ return context.WithValue(ctx, transactionKey, txn), nil
+}
+
+func (store *ArangodbStore) CommitTransaction(ctx context.Context) error {
+ val := ctx.Value(transactionKey)
+ cast, ok := val.(driver.TransactionID)
+ if !ok {
+ return fmt.Errorf("txn cast fail %s:", val)
+ }
+ err := store.database.CommitTransaction(ctx, cast, &driver.CommitTransactionOptions{})
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *ArangodbStore) RollbackTransaction(ctx context.Context) error {
+ val := ctx.Value(transactionKey)
+ cast, ok := val.(driver.TransactionID)
+ if !ok {
+ return fmt.Errorf("txn cast fail %s:", val)
+ }
+ err := store.database.AbortTransaction(ctx, cast, &driver.AbortTransactionOptions{})
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+ model := &Model{
+ Key: hashString(string(entry.FullPath)),
+ Directory: dir,
+ Name: name,
+ Meta: bytesToArray(meta),
+ }
+ if entry.TtlSec > 0 {
+ model.Ttl = time.Now().Add(time.Second * time.Duration(entry.TtlSec)).Format(time.RFC3339)
+ } else {
+ model.Ttl = ""
+ }
+
+ targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.CreateDocument(ctx, model)
+ if driver.IsConflict(err) {
+ return store.UpdateEntry(ctx, entry)
+ }
+
+ if err != nil {
+ return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err)
+ }
+
+ return nil
+
+}
+
+func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+ model := &Model{
+ Key: hashString(string(entry.FullPath)),
+ Directory: dir,
+ Name: name,
+ Meta: bytesToArray(meta),
+ }
+ if entry.TtlSec > 0 {
+ model.Ttl = time.Now().Add(time.Duration(entry.TtlSec) * time.Second).Format(time.RFC3339)
+ } else {
+ model.Ttl = "none"
+ }
+ targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.UpdateDocument(ctx, model.Key, model)
+ if err != nil {
+ return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+ var data Model
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return nil, err
+ }
+ _, err = targetCollection.ReadDocument(ctx, hashString(string(fullpath)), &data)
+ if err != nil {
+ if driver.IsNotFound(err) {
+ return nil, filer_pb.ErrNotFound
+ }
+ glog.Errorf("find %s: %v", fullpath, err)
+ return nil, filer_pb.ErrNotFound
+ }
+ if len(data.Meta) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(arrayToBytes(data.Meta)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath)))
+ if err != nil && !driver.IsNotFound(err) {
+ glog.Errorf("find %s: %v", fullpath, err)
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+ return nil
+}
+
+// this runs in log time
+func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+ var query string
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return err
+ }
+ query = query + fmt.Sprintf(`
+ for d in %s
+ filter starts_with(d.directory, "%s/") || d.directory == "%s"
+ remove d._key in %s`,
+ targetCollection.Name(),
+ strings.Join(strings.Split(string(fullpath), "/"), ","),
+ string(fullpath),
+ targetCollection.Name(),
+ )
+ cur, err := store.database.Query(ctx, query, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+ defer cur.Close()
+ return nil
+}
+
+func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/")
+ if err != nil {
+ return lastFileName, err
+ }
+ query := "for d in " + targetCollection.Name()
+ if includeStartFile {
+ query = query + " filter d.name >= \"" + startFileName + "\" "
+ } else {
+ query = query + " filter d.name > \"" + startFileName + "\" "
+ }
+ if prefix != "" {
+ query = query + fmt.Sprintf(`&& starts_with(d.name, "%s")`, prefix)
+ }
+ query = query + `
+filter d.directory == @dir
+sort d.name asc
+`
+ if limit > 0 {
+ query = query + "limit " + strconv.Itoa(int(limit))
+ }
+ query = query + "\n return d"
+ cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath})
+ if err != nil {
+ return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err)
+ }
+ defer cur.Close()
+ for cur.HasMore() {
+ var data Model
+ _, err = cur.ReadDocument(ctx, &data)
+ if err != nil {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(data.Directory, data.Name),
+ }
+ lastFileName = data.Name
+ converted := arrayToBytes(data.Meta)
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+
+ if !eachEntryFunc(entry) {
+ break
+ }
+
+ }
+ return lastFileName, err
+}
+
+func (store *ArangodbStore) Shutdown() {
+}
diff --git a/weed/filer/arangodb/arangodb_store_bucket.go b/weed/filer/arangodb/arangodb_store_bucket.go
new file mode 100644
index 000000000..63d407309
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store_bucket.go
@@ -0,0 +1,40 @@
+package arangodb
+
+import (
+ "context"
+ "time"
+
+ "github.com/arangodb/go-driver"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+var _ filer.BucketAware = (*ArangodbStore)(nil)
+
+func (store *ArangodbStore) OnBucketCreation(bucket string) {
+ timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ // create the collection && add to cache
+ _, err := store.ensureBucket(timeout, bucket)
+ if err != nil {
+ glog.V(0).Infof("bucket create %s : %w", bucket, err)
+ }
+}
+func (store *ArangodbStore) OnBucketDeletion(bucket string) {
+ timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ collection, err := store.ensureBucket(timeout, bucket)
+ if err != nil {
+ glog.V(0).Infof("bucket delete %s : %w", bucket, err)
+ return
+ }
+ err = collection.Remove(timeout)
+ if err != nil && !driver.IsNotFound(err) {
+ glog.V(0).Infof("bucket delete %s : %w", bucket, err)
+ return
+ }
+}
+func (store *ArangodbStore) CanDropWholeBucket() bool {
+ return true
+}
diff --git a/weed/filer/arangodb/arangodb_store_kv.go b/weed/filer/arangodb/arangodb_store_kv.go
new file mode 100644
index 000000000..c1307e78d
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store_kv.go
@@ -0,0 +1,54 @@
+package arangodb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/arangodb/go-driver"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ model := &Model{
+ Key: hashString(".kvstore." + string(key)),
+ Directory: ".kvstore." + string(key),
+ Meta: bytesToArray(value),
+ }
+
+ exists, err := store.kvCollection.DocumentExists(ctx, model.Key)
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+ if exists {
+ _, err = store.kvCollection.UpdateDocument(ctx, model.Key, model)
+ } else {
+ _, err = store.kvCollection.CreateDocument(ctx, model)
+ }
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ var model Model
+ _, err = store.kvCollection.ReadDocument(ctx, hashString(".kvstore."+string(key)), &model)
+ if driver.IsNotFound(err) {
+ return nil, filer.ErrKvNotFound
+ }
+ if err != nil {
+ glog.Errorf("kv get: %s %v", string(key), err)
+ return nil, filer.ErrKvNotFound
+ }
+ return arrayToBytes(model.Meta), nil
+}
+
+func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ _, err = store.kvCollection.RemoveDocument(ctx, hashString(".kvstore."+string(key)))
+ if err != nil {
+ glog.Errorf("kv del: %v", err)
+ return filer.ErrKvNotFound
+ }
+ return nil
+}
diff --git a/weed/filer/arangodb/helpers.go b/weed/filer/arangodb/helpers.go
new file mode 100644
index 000000000..943189781
--- /dev/null
+++ b/weed/filer/arangodb/helpers.go
@@ -0,0 +1,136 @@
+package arangodb
+
+import (
+ "context"
+ "crypto/md5"
+ "encoding/binary"
+ "encoding/hex"
+ "io"
+ "strings"
+
+ "github.com/arangodb/go-driver"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+//convert a string into arango-key safe hex bytes hash
+func hashString(dir string) string {
+ h := md5.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return hex.EncodeToString(b)
+}
+
+// convert slice of bytes into slice of uint64
+// the first uint64 indicates the length in bytes
+func bytesToArray(bs []byte) []uint64 {
+ out := make([]uint64, 0, 2+len(bs)/8)
+ out = append(out, uint64(len(bs)))
+ for len(bs)%8 != 0 {
+ bs = append(bs, 0)
+ }
+ for i := 0; i < len(bs); i = i + 8 {
+ out = append(out, binary.BigEndian.Uint64(bs[i:]))
+ }
+ return out
+}
+
+// convert from slice of uint64 back to bytes
+// if input length is 0 or 1, will return nil
+func arrayToBytes(xs []uint64) []byte {
+ if len(xs) < 2 {
+ return nil
+ }
+ first := xs[0]
+ out := make([]byte, len(xs)*8) // i think this can actually be len(xs)*8-8, but i dont think an extra 8 bytes hurts...
+ for i := 1; i < len(xs); i = i + 1 {
+ binary.BigEndian.PutUint64(out[((i-1)*8):], xs[i])
+ }
+ return out[:first]
+}
+
+// gets the collection the bucket points to from filepath
+func (store *ArangodbStore) extractBucketCollection(ctx context.Context, fullpath util.FullPath) (c driver.Collection, err error) {
+ bucket, _ := extractBucket(fullpath)
+ if bucket == "" {
+ bucket = DEFAULT_COLLECTION
+ }
+ c, err = store.ensureBucket(ctx, bucket)
+ if err != nil {
+ return nil, err
+ }
+ return c, err
+}
+
+// called by extractBucketCollection
+func extractBucket(fullpath util.FullPath) (string, string) {
+ if !strings.HasPrefix(string(fullpath), BUCKET_PREFIX+"/") {
+ return "", string(fullpath)
+ }
+ if strings.Count(string(fullpath), "/") < 3 {
+ return "", string(fullpath)
+ }
+ bucketAndObjectKey := string(fullpath)[len(BUCKET_PREFIX+"/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ bucket := bucketAndObjectKey
+ shortPath := "/"
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = string(util.FullPath(bucketAndObjectKey[t:]))
+ }
+ return bucket, shortPath
+}
+
+// get bucket collection from cache. if not exist, creates the buckets collection and grab it
+func (store *ArangodbStore) ensureBucket(ctx context.Context, bucket string) (bc driver.Collection, err error) {
+ var ok bool
+ store.mu.RLock()
+ bc, ok = store.buckets[bucket]
+ store.mu.RUnlock()
+ if ok {
+ return bc, nil
+ }
+ store.mu.Lock()
+ defer store.mu.Unlock()
+ store.buckets[bucket], err = store.ensureCollection(ctx, bucket)
+ if err != nil {
+ return nil, err
+ }
+ return store.buckets[bucket], nil
+}
+
+// creates collection if not exist, ensures indices if not exist
+func (store *ArangodbStore) ensureCollection(ctx context.Context, name string) (c driver.Collection, err error) {
+ ok, err := store.database.CollectionExists(ctx, name)
+ if err != nil {
+ return
+ }
+ if ok {
+ c, err = store.database.Collection(ctx, name)
+ } else {
+ c, err = store.database.CreateCollection(ctx, name, &driver.CreateCollectionOptions{})
+ }
+ if err != nil {
+ return
+ }
+ // ensure indices
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory", "name"},
+ &driver.EnsurePersistentIndexOptions{
+ Name: "directory_name_multi", Unique: true,
+ }); err != nil {
+ return
+ }
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory"},
+ &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil {
+ return
+ }
+ if _, _, err = c.EnsureTTLIndex(ctx, "ttl", 1,
+ &driver.EnsureTTLIndexOptions{Name: "IDX_TTL"}); err != nil {
+ return
+ }
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{
+ Name: "IDX_name",
+ }); err != nil {
+ return
+ }
+ return c, nil
+}
diff --git a/weed/filer/arangodb/readme.md b/weed/filer/arangodb/readme.md
new file mode 100644
index 000000000..e189811fb
--- /dev/null
+++ b/weed/filer/arangodb/readme.md
@@ -0,0 +1,52 @@
+##arangodb
+
+database: https://github.com/arangodb/arangodb
+go driver: https://github.com/arangodb/go-driver
+
+options:
+
+```
+[arangodb]
+enabled=true
+db_name="seaweedfs"
+servers=["http://localhost:8529"]
+#basic auth
+user="root"
+pass="test"
+
+# tls settings
+insecure_skip_verify=true
+```
+
+i test using this dev database:
+`docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=test arangodb/arangodb:3.9.0`
+
+
+## features i don't personally need but are missing
+ [ ] provide tls cert to arango
+ [ ] authentication that is not basic auth
+ [ ] synchronise endpoint interval config
+ [ ] automatic creation of custom index
+ [ ] configure default arangodb collection sharding rules
+ [ ] configure default arangodb collection replication rules
+
+
+## complexity
+
+ok, so if https://www.arangodb.com/docs/stable/indexing-index-basics.html#persistent-index is correct
+
+O(1)
+- InsertEntry
+- UpdateEntry
+- FindEntry
+- DeleteEntry
+- KvPut
+- KvGet
+- KvDelete
+
+O(log(BUCKET_SIZE))
+- DeleteFolderChildren
+
+O(log(DIRECTORY_SIZE))
+- ListDirectoryEntries
+- ListDirectoryPrefixedEntries
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index c774f5d27..27e68433d 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -25,9 +25,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
if findErr != nil {
return findErr
}
-
isDeleteCollection := f.isBucket(entry)
-
if entry.IsDirectory() {
// delete the folder children, not including the folder itself
err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures, func(chunks []*filer_pb.FileChunk) error {
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 7edd5870f..8779e9ac0 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -21,6 +21,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"