aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/arangodb
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/arangodb')
-rw-r--r--weed/filer/arangodb/arangodb_store.go347
-rw-r--r--weed/filer/arangodb/arangodb_store_bucket.go40
-rw-r--r--weed/filer/arangodb/arangodb_store_kv.go54
-rw-r--r--weed/filer/arangodb/helpers.go136
-rw-r--r--weed/filer/arangodb/readme.md52
5 files changed, 629 insertions, 0 deletions
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go
new file mode 100644
index 000000000..9fd1fffb3
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store.go
@@ -0,0 +1,347 @@
+package arangodb
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/arangodb/go-driver"
+ "github.com/arangodb/go-driver/http"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &ArangodbStore{})
+}
+
+var (
+ BUCKET_PREFIX = "/buckets"
+ DEFAULT_COLLECTION = "seaweed_no_bucket"
+ KVMETA_COLLECTION = "seaweed_kvmeta"
+)
+
+type ArangodbStore struct {
+ connect driver.Connection
+ client driver.Client
+ database driver.Database
+ kvCollection driver.Collection
+
+ buckets map[string]driver.Collection
+ mu sync.RWMutex
+
+ databaseName string
+}
+
+type Model struct {
+ Key string `json:"_key"`
+ Directory string `json:"directory,omitempty"`
+ Name string `json:"name,omitempty"`
+ Ttl string `json:"ttl,omitempty"`
+
+ //arangodb does not support binary blobs
+ //we encode byte slice into uint64 slice
+ //see helpers.go
+ Meta []uint64 `json:"meta"`
+}
+
+func (store *ArangodbStore) GetName() string {
+ return "arangodb"
+}
+
+func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ store.buckets = make(map[string]driver.Collection, 3)
+ store.databaseName = configuration.GetString(prefix + "db_name")
+ return store.connection(configuration.GetStringSlice(prefix+"servers"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"insecure_skip_verify"),
+ )
+}
+
+func (store *ArangodbStore) connection(uris []string, user string, pass string, insecure bool) (err error) {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+
+ store.connect, err = http.NewConnection(http.ConnectionConfig{
+ Endpoints: uris,
+ TLSConfig: &tls.Config{
+ InsecureSkipVerify: insecure,
+ },
+ })
+ if err != nil {
+ return err
+ }
+ store.client, err = driver.NewClient(driver.ClientConfig{
+ Connection: store.connect,
+ Authentication: driver.BasicAuthentication(user, pass),
+ })
+ if err != nil {
+ return err
+ }
+ ok, err := store.client.DatabaseExists(ctx, store.databaseName)
+ if err != nil {
+ return err
+ }
+ if ok {
+ store.database, err = store.client.Database(ctx, store.databaseName)
+ } else {
+ store.database, err = store.client.CreateDatabase(ctx, store.databaseName, &driver.CreateDatabaseOptions{})
+ }
+ if err != nil {
+ return err
+ }
+ if store.kvCollection, err = store.ensureCollection(ctx, KVMETA_COLLECTION); err != nil {
+ return err
+ }
+ return err
+}
+
+type key int
+
+const (
+ transactionKey key = 0
+)
+
+func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ keys := make([]string, 0, len(store.buckets)+1)
+ for k := range store.buckets {
+ keys = append(keys, k)
+ }
+ keys = append(keys, store.kvCollection.Name())
+ txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{
+ Exclusive: keys,
+ }, &driver.BeginTransactionOptions{})
+ if err != nil {
+ return nil, err
+ }
+
+ return context.WithValue(ctx, transactionKey, txn), nil
+}
+
+func (store *ArangodbStore) CommitTransaction(ctx context.Context) error {
+ val := ctx.Value(transactionKey)
+ cast, ok := val.(driver.TransactionID)
+ if !ok {
+ return fmt.Errorf("txn cast fail %s:", val)
+ }
+ err := store.database.CommitTransaction(ctx, cast, &driver.CommitTransactionOptions{})
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *ArangodbStore) RollbackTransaction(ctx context.Context) error {
+ val := ctx.Value(transactionKey)
+ cast, ok := val.(driver.TransactionID)
+ if !ok {
+ return fmt.Errorf("txn cast fail %s:", val)
+ }
+ err := store.database.AbortTransaction(ctx, cast, &driver.AbortTransactionOptions{})
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+ model := &Model{
+ Key: hashString(string(entry.FullPath)),
+ Directory: dir,
+ Name: name,
+ Meta: bytesToArray(meta),
+ }
+ if entry.TtlSec > 0 {
+ model.Ttl = time.Now().Add(time.Second * time.Duration(entry.TtlSec)).Format(time.RFC3339)
+ } else {
+ model.Ttl = ""
+ }
+
+ targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.CreateDocument(ctx, model)
+ if driver.IsConflict(err) {
+ return store.UpdateEntry(ctx, entry)
+ }
+
+ if err != nil {
+ return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err)
+ }
+
+ return nil
+
+}
+
+func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+ model := &Model{
+ Key: hashString(string(entry.FullPath)),
+ Directory: dir,
+ Name: name,
+ Meta: bytesToArray(meta),
+ }
+ if entry.TtlSec > 0 {
+ model.Ttl = time.Now().Add(time.Duration(entry.TtlSec) * time.Second).Format(time.RFC3339)
+ } else {
+ model.Ttl = "none"
+ }
+ targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.UpdateDocument(ctx, model.Key, model)
+ if err != nil {
+ return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+ var data Model
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return nil, err
+ }
+ _, err = targetCollection.ReadDocument(ctx, hashString(string(fullpath)), &data)
+ if err != nil {
+ if driver.IsNotFound(err) {
+ return nil, filer_pb.ErrNotFound
+ }
+ glog.Errorf("find %s: %v", fullpath, err)
+ return nil, filer_pb.ErrNotFound
+ }
+ if len(data.Meta) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(arrayToBytes(data.Meta)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath)))
+ if err != nil && !driver.IsNotFound(err) {
+ glog.Errorf("find %s: %v", fullpath, err)
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+ return nil
+}
+
+// this runs in log time
+func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+ var query string
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return err
+ }
+ query = query + fmt.Sprintf(`
+ for d in %s
+ filter starts_with(d.directory, "%s/") || d.directory == "%s"
+ remove d._key in %s`,
+ targetCollection.Name(),
+ strings.Join(strings.Split(string(fullpath), "/"), ","),
+ string(fullpath),
+ targetCollection.Name(),
+ )
+ cur, err := store.database.Query(ctx, query, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+ defer cur.Close()
+ return nil
+}
+
+func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/")
+ if err != nil {
+ return lastFileName, err
+ }
+ query := "for d in " + targetCollection.Name()
+ if includeStartFile {
+ query = query + " filter d.name >= \"" + startFileName + "\" "
+ } else {
+ query = query + " filter d.name > \"" + startFileName + "\" "
+ }
+ if prefix != "" {
+ query = query + fmt.Sprintf(`&& starts_with(d.name, "%s")`, prefix)
+ }
+ query = query + `
+filter d.directory == @dir
+sort d.name asc
+`
+ if limit > 0 {
+ query = query + "limit " + strconv.Itoa(int(limit))
+ }
+ query = query + "\n return d"
+ cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath})
+ if err != nil {
+ return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err)
+ }
+ defer cur.Close()
+ for cur.HasMore() {
+ var data Model
+ _, err = cur.ReadDocument(ctx, &data)
+ if err != nil {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(data.Directory, data.Name),
+ }
+ lastFileName = data.Name
+ converted := arrayToBytes(data.Meta)
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+
+ if !eachEntryFunc(entry) {
+ break
+ }
+
+ }
+ return lastFileName, err
+}
+
+func (store *ArangodbStore) Shutdown() {
+}
diff --git a/weed/filer/arangodb/arangodb_store_bucket.go b/weed/filer/arangodb/arangodb_store_bucket.go
new file mode 100644
index 000000000..810d639a7
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store_bucket.go
@@ -0,0 +1,40 @@
+package arangodb
+
+import (
+ "context"
+ "github.com/arangodb/go-driver"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+var _ filer.BucketAware = (*ArangodbStore)(nil)
+
+func (store *ArangodbStore) OnBucketCreation(bucket string) {
+ timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ // create the collection && add to cache
+ _, err := store.ensureBucket(timeout, bucket)
+ if err != nil {
+ glog.Errorf("bucket create %s: %v", bucket, err)
+ }
+}
+func (store *ArangodbStore) OnBucketDeletion(bucket string) {
+ timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ collection, err := store.ensureBucket(timeout, bucket)
+ if err != nil {
+ glog.Errorf("bucket delete %s: %v", bucket, err)
+ return
+ }
+ err = collection.Remove(timeout)
+ if err != nil && !driver.IsNotFound(err) {
+ glog.Errorf("bucket delete %s: %v", bucket, err)
+ return
+ }
+}
+func (store *ArangodbStore) CanDropWholeBucket() bool {
+ return true
+}
diff --git a/weed/filer/arangodb/arangodb_store_kv.go b/weed/filer/arangodb/arangodb_store_kv.go
new file mode 100644
index 000000000..c1307e78d
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store_kv.go
@@ -0,0 +1,54 @@
+package arangodb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/arangodb/go-driver"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ model := &Model{
+ Key: hashString(".kvstore." + string(key)),
+ Directory: ".kvstore." + string(key),
+ Meta: bytesToArray(value),
+ }
+
+ exists, err := store.kvCollection.DocumentExists(ctx, model.Key)
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+ if exists {
+ _, err = store.kvCollection.UpdateDocument(ctx, model.Key, model)
+ } else {
+ _, err = store.kvCollection.CreateDocument(ctx, model)
+ }
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ var model Model
+ _, err = store.kvCollection.ReadDocument(ctx, hashString(".kvstore."+string(key)), &model)
+ if driver.IsNotFound(err) {
+ return nil, filer.ErrKvNotFound
+ }
+ if err != nil {
+ glog.Errorf("kv get: %s %v", string(key), err)
+ return nil, filer.ErrKvNotFound
+ }
+ return arrayToBytes(model.Meta), nil
+}
+
+func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ _, err = store.kvCollection.RemoveDocument(ctx, hashString(".kvstore."+string(key)))
+ if err != nil {
+ glog.Errorf("kv del: %v", err)
+ return filer.ErrKvNotFound
+ }
+ return nil
+}
diff --git a/weed/filer/arangodb/helpers.go b/weed/filer/arangodb/helpers.go
new file mode 100644
index 000000000..943189781
--- /dev/null
+++ b/weed/filer/arangodb/helpers.go
@@ -0,0 +1,136 @@
+package arangodb
+
+import (
+ "context"
+ "crypto/md5"
+ "encoding/binary"
+ "encoding/hex"
+ "io"
+ "strings"
+
+ "github.com/arangodb/go-driver"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+//convert a string into arango-key safe hex bytes hash
+func hashString(dir string) string {
+ h := md5.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return hex.EncodeToString(b)
+}
+
+// convert slice of bytes into slice of uint64
+// the first uint64 indicates the length in bytes
+func bytesToArray(bs []byte) []uint64 {
+ out := make([]uint64, 0, 2+len(bs)/8)
+ out = append(out, uint64(len(bs)))
+ for len(bs)%8 != 0 {
+ bs = append(bs, 0)
+ }
+ for i := 0; i < len(bs); i = i + 8 {
+ out = append(out, binary.BigEndian.Uint64(bs[i:]))
+ }
+ return out
+}
+
+// convert from slice of uint64 back to bytes
+// if input length is 0 or 1, will return nil
+func arrayToBytes(xs []uint64) []byte {
+ if len(xs) < 2 {
+ return nil
+ }
+ first := xs[0]
+ out := make([]byte, len(xs)*8) // i think this can actually be len(xs)*8-8, but i dont think an extra 8 bytes hurts...
+ for i := 1; i < len(xs); i = i + 1 {
+ binary.BigEndian.PutUint64(out[((i-1)*8):], xs[i])
+ }
+ return out[:first]
+}
+
+// gets the collection the bucket points to from filepath
+func (store *ArangodbStore) extractBucketCollection(ctx context.Context, fullpath util.FullPath) (c driver.Collection, err error) {
+ bucket, _ := extractBucket(fullpath)
+ if bucket == "" {
+ bucket = DEFAULT_COLLECTION
+ }
+ c, err = store.ensureBucket(ctx, bucket)
+ if err != nil {
+ return nil, err
+ }
+ return c, err
+}
+
+// called by extractBucketCollection
+func extractBucket(fullpath util.FullPath) (string, string) {
+ if !strings.HasPrefix(string(fullpath), BUCKET_PREFIX+"/") {
+ return "", string(fullpath)
+ }
+ if strings.Count(string(fullpath), "/") < 3 {
+ return "", string(fullpath)
+ }
+ bucketAndObjectKey := string(fullpath)[len(BUCKET_PREFIX+"/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ bucket := bucketAndObjectKey
+ shortPath := "/"
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = string(util.FullPath(bucketAndObjectKey[t:]))
+ }
+ return bucket, shortPath
+}
+
+// get bucket collection from cache. if not exist, creates the buckets collection and grab it
+func (store *ArangodbStore) ensureBucket(ctx context.Context, bucket string) (bc driver.Collection, err error) {
+ var ok bool
+ store.mu.RLock()
+ bc, ok = store.buckets[bucket]
+ store.mu.RUnlock()
+ if ok {
+ return bc, nil
+ }
+ store.mu.Lock()
+ defer store.mu.Unlock()
+ store.buckets[bucket], err = store.ensureCollection(ctx, bucket)
+ if err != nil {
+ return nil, err
+ }
+ return store.buckets[bucket], nil
+}
+
+// creates collection if not exist, ensures indices if not exist
+func (store *ArangodbStore) ensureCollection(ctx context.Context, name string) (c driver.Collection, err error) {
+ ok, err := store.database.CollectionExists(ctx, name)
+ if err != nil {
+ return
+ }
+ if ok {
+ c, err = store.database.Collection(ctx, name)
+ } else {
+ c, err = store.database.CreateCollection(ctx, name, &driver.CreateCollectionOptions{})
+ }
+ if err != nil {
+ return
+ }
+ // ensure indices
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory", "name"},
+ &driver.EnsurePersistentIndexOptions{
+ Name: "directory_name_multi", Unique: true,
+ }); err != nil {
+ return
+ }
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory"},
+ &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil {
+ return
+ }
+ if _, _, err = c.EnsureTTLIndex(ctx, "ttl", 1,
+ &driver.EnsureTTLIndexOptions{Name: "IDX_TTL"}); err != nil {
+ return
+ }
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{
+ Name: "IDX_name",
+ }); err != nil {
+ return
+ }
+ return c, nil
+}
diff --git a/weed/filer/arangodb/readme.md b/weed/filer/arangodb/readme.md
new file mode 100644
index 000000000..e189811fb
--- /dev/null
+++ b/weed/filer/arangodb/readme.md
@@ -0,0 +1,52 @@
+##arangodb
+
+database: https://github.com/arangodb/arangodb
+go driver: https://github.com/arangodb/go-driver
+
+options:
+
+```
+[arangodb]
+enabled=true
+db_name="seaweedfs"
+servers=["http://localhost:8529"]
+#basic auth
+user="root"
+pass="test"
+
+# tls settings
+insecure_skip_verify=true
+```
+
+i test using this dev database:
+`docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=test arangodb/arangodb:3.9.0`
+
+
+## features i don't personally need but are missing
+ [ ] provide tls cert to arango
+ [ ] authentication that is not basic auth
+ [ ] synchronise endpoint interval config
+ [ ] automatic creation of custom index
+ [ ] configure default arangodb collection sharding rules
+ [ ] configure default arangodb collection replication rules
+
+
+## complexity
+
+ok, so if https://www.arangodb.com/docs/stable/indexing-index-basics.html#persistent-index is correct
+
+O(1)
+- InsertEntry
+- UpdateEntry
+- FindEntry
+- DeleteEntry
+- KvPut
+- KvGet
+- KvDelete
+
+O(log(BUCKET_SIZE))
+- DeleteFolderChildren
+
+O(log(DIRECTORY_SIZE))
+- ListDirectoryEntries
+- ListDirectoryPrefixedEntries