diff options
| author | elee <eddy@gfxlabs.io> | 2022-03-17 04:49:26 -0500 |
|---|---|---|
| committer | elee <eddy@gfxlabs.io> | 2022-03-17 04:49:26 -0500 |
| commit | 921535001a62424791549d51f6c91f3baefd2f1b (patch) | |
| tree | f346154d7711dbed8bede2eea3e96f334ecf9e64 /weed/filer/arangodb | |
| parent | 68cf6a922954587d0457a100a7576afc209f26eb (diff) | |
| download | seaweedfs-921535001a62424791549d51f6c91f3baefd2f1b.tar.xz seaweedfs-921535001a62424791549d51f6c91f3baefd2f1b.zip | |
arangodb adapter
Diffstat (limited to 'weed/filer/arangodb')
| -rw-r--r-- | weed/filer/arangodb/arangodb_store.go | 348 | ||||
| -rw-r--r-- | weed/filer/arangodb/arangodb_store_kv.go | 62 |
2 files changed, 410 insertions, 0 deletions
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go new file mode 100644 index 000000000..3cc9e14c9 --- /dev/null +++ b/weed/filer/arangodb/arangodb_store.go @@ -0,0 +1,348 @@ +package arangodb + +import ( + "context" + "crypto/md5" + "crypto/tls" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "time" + + "github.com/arangodb/go-driver" + "github.com/arangodb/go-driver/http" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + filer.Stores = append(filer.Stores, &ArangodbStore{}) +} + +type ArangodbStore struct { + connect driver.Connection + client driver.Client + database driver.Database + collection driver.Collection +} + +type Model struct { + Key string `json:"_key"` + Directory string `json:"directory"` + Name string `json:"name"` + Meta []uint64 `json:"meta"` +} + +func (store *ArangodbStore) GetName() string { + return "arangodb" +} + +func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.connection(configuration.GetStringSlice(prefix+"arango_host"), + configuration.GetString(prefix+"arango_user"), + configuration.GetString(prefix+"arango_pass"), + ) +} + +func (store *ArangodbStore) connection(uris []string, user string, pass string) (err error) { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + + store.connect, err = http.NewConnection(http.ConnectionConfig{ + Endpoints: uris, + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }) + if err != nil { + return err + } + store.client, err = driver.NewClient(driver.ClientConfig{ + Connection: store.connect, + Authentication: driver.BasicAuthentication(user, pass), + }) + if err != nil { + return err + } + db_name := "seaweed-filer" + ok, err := store.client.DatabaseExists(ctx, db_name) + if err != nil { + return err + } + if ok { + store.database, err = store.client.Database(ctx, db_name) + } else { + store.database, err = store.client.CreateDatabase(ctx, db_name, &driver.CreateDatabaseOptions{}) + } + if err != nil { + return err + } + + coll_name := "files" + ok, err = store.database.CollectionExists(ctx, coll_name) + if err != nil { + return err + } + if ok { + store.collection, err = store.database.Collection(ctx, coll_name) + } else { + store.collection, err = store.database.CreateCollection(ctx, coll_name, &driver.CreateCollectionOptions{}) + } + if err != nil { + return err + } + + // ensure indices + + if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory", "name"}, &driver.EnsurePersistentIndexOptions{ + Name: "directory_name_multi", + Unique: true, + }); err != nil { + return err + } + if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory"}, + &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil { + return err + } + + if _, _, err = store.collection.EnsureFullTextIndex(ctx, []string{"directory_fulltext"}, + &driver.EnsureFullTextIndexOptions{Name: "IDX_FULLTEXT_directory", MinLength: 1}); err != nil { + return err + } + + if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{ + Name: "IDX_name", + }); err != nil { + return err + } + + return err +} + +type key int + +const ( + transactionKey key = 0 +) + +func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{ + Exclusive: []string{"files"}, + }, &driver.BeginTransactionOptions{}) + if err != nil { + return nil, err + } + + return context.WithValue(ctx, transactionKey, txn), nil +} + +func (store *ArangodbStore) CommitTransaction(ctx context.Context) error { + val := ctx.Value(transactionKey) + cast, ok := val.(driver.TransactionID) + if !ok { + return fmt.Errorf("txn cast fail %s:", val) + } + err := store.database.CommitTransaction(ctx, cast, &driver.CommitTransactionOptions{}) + if err != nil { + return err + } + return nil +} + +func (store *ArangodbStore) RollbackTransaction(ctx context.Context) error { + val := ctx.Value(transactionKey) + cast, ok := val.(driver.TransactionID) + if !ok { + return fmt.Errorf("txn cast fail %s:", val) + } + err := store.database.AbortTransaction(ctx, cast, &driver.AbortTransactionOptions{}) + if err != nil { + return err + } + return nil +} + +func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + model := &Model{ + Key: hashString(string(entry.FullPath)), + Directory: dir, + Name: name, + Meta: bytesToArray(meta), + } + _, err = store.collection.CreateDocument(ctx, model) + + if err != nil { + return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err) + } + + return nil + +} + +func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + model := &Model{ + Key: hashString(string(entry.FullPath)), + Directory: dir, + Name: name, + Meta: bytesToArray(meta), + } + + _, err = store.collection.UpdateDocument(ctx, model.Key, model) + + if err != nil { + return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err) + } + + return nil +} + +func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + var data Model + _, err = store.collection.ReadDocument(ctx, hashString(string(fullpath)), &data) + if driver.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } + if err != nil { + glog.Errorf("find %s: %v", fullpath, err) + return nil, filer_pb.ErrNotFound + } + if len(data.Meta) == 0 { + return nil, filer_pb.ErrNotFound + } + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(arrayToBytes(data.Meta))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { + _, err := store.collection.RemoveDocument(ctx, hashString(string(fullpath))) + if err != nil { + glog.Errorf("find %s: %v", fullpath, err) + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + dir, _ := fullpath.DirAndName() + cur, err := store.database.Query(ctx, ` +for d in files +filter d.directory == @dir +remove d in files`, map[string]interface{}{"dir": dir}) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + defer cur.Close() + return nil +} + +func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + dir, name := dirPath.DirAndName() + eq := "" + if includeStartFile { + eq = "=" + } + _ = eq + _ = name + + cur, err := store.database.Query(ctx, fmt.Sprintf(` +for d in files +filter d.directory == @dir +sort d.name desc +limit %d +return d`, limit), map[string]interface{}{ + "dir": dir, + }) + if err != nil { + return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) + } + defer cur.Close() + for cur.HasMore() { + var data Model + _, err = cur.ReadDocument(ctx, &data) + if err != nil { + break + } + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), data.Name), + } + lastFileName = data.Name + converted := arrayToBytes(data.Meta) + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + + if !eachEntryFunc(entry) { + break + } + + } + return lastFileName, err +} + +func (store *ArangodbStore) Shutdown() { +} + +func hashString(dir string) string { + h := md5.New() + io.WriteString(h, dir) + b := h.Sum(nil) + return hex.EncodeToString(b) +} + +func bytesToArray(bs []byte) []uint64 { + out := make([]uint64, 0, 2+len(bs)/8) + out = append(out, uint64(len(bs))) + for len(bs)%8 != 0 { + bs = append(bs, 0) + } + for i := 0; i < len(bs); i = i + 8 { + out = append(out, binary.BigEndian.Uint64(bs[i:])) + } + return out +} + +func arrayToBytes(xs []uint64) []byte { + if len(xs) < 2 { + return []byte{} + } + first := xs[0] + out := make([]byte, len(xs)*8) + for i := 1; i < len(xs); i = i + 1 { + binary.BigEndian.PutUint64(out[((i-1)*8):], xs[i]) + } + return out[:first] +} diff --git a/weed/filer/arangodb/arangodb_store_kv.go b/weed/filer/arangodb/arangodb_store_kv.go new file mode 100644 index 000000000..93caa75ed --- /dev/null +++ b/weed/filer/arangodb/arangodb_store_kv.go @@ -0,0 +1,62 @@ +package arangodb + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + dir, name := genDirAndName(key) + model := &Model{ + Key: hashString(string(key)), + Directory: dir, + Name: name, + Meta: bytesToArray(value), + } + + exists, err := store.collection.DocumentExists(ctx, model.Key) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + if exists { + _, err = store.collection.UpdateDocument(ctx, model.Key, model) + } else { + _, err = store.collection.CreateDocument(ctx, model) + } + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + var model Model + _, err = store.collection.ReadDocument(ctx, hashString(string(key)), &model) + if err != nil { + glog.Errorf("kv get: %v", err) + return nil, filer.ErrKvNotFound + } + return arrayToBytes(model.Meta), nil +} + +func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) { + _, err = store.collection.RemoveDocument(ctx, hashString(string(key))) + if err != nil { + glog.Errorf("kv del: %v", err) + return filer.ErrKvNotFound + } + return nil +} + +func genDirAndName(key []byte) (dir string, name string) { + for len(key) < 8 { + key = append(key, 0) + } + dir = string(key[:8]) + name = string(key[8:]) + return +} |
