diff options
| author | elee <eddy@gfxlabs.io> | 2022-03-18 21:51:16 -0500 |
|---|---|---|
| committer | elee <eddy@gfxlabs.io> | 2022-03-18 21:51:16 -0500 |
| commit | 411c0df3fec3344c3e5538cf56b54c1567162386 (patch) | |
| tree | 9879535202b9610043f4d71552db7c0a682dd347 /weed/filer/arangodb/arangodb_store.go | |
| parent | 1cea6c73d30ddee95b42335af74701a61c19ec84 (diff) | |
| download | seaweedfs-411c0df3fec3344c3e5538cf56b54c1567162386.tar.xz seaweedfs-411c0df3fec3344c3e5538cf56b54c1567162386.zip | |
switch to multi collection, change readme
Diffstat (limited to 'weed/filer/arangodb/arangodb_store.go')
| -rw-r--r-- | weed/filer/arangodb/arangodb_store.go | 213 |
1 files changed, 78 insertions, 135 deletions
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go index 23aefe40f..d27799b0e 100644 --- a/weed/filer/arangodb/arangodb_store.go +++ b/weed/filer/arangodb/arangodb_store.go @@ -4,7 +4,9 @@ import ( "context" "crypto/tls" "fmt" + "strconv" "strings" + "sync" "time" "github.com/arangodb/go-driver" @@ -19,11 +21,20 @@ func init() { filer.Stores = append(filer.Stores, &ArangodbStore{}) } +var ( + BUCKET_PREFIX = "/buckets" + DEFAULT_COLLECTION = "seaweed_no_bucket" + KVMETA_COLLECTION = "seaweed_kvmeta" +) + type ArangodbStore struct { - connect driver.Connection - client driver.Client - database driver.Database - collection driver.Collection + connect driver.Connection + client driver.Client + database driver.Database + kvCollection driver.Collection + + buckets map[string]driver.Collection + mu sync.RWMutex databaseName string } @@ -32,7 +43,6 @@ type Model struct { Key string `json:"_key"` Directory string `json:"directory,omitempty"` Name string `json:"name,omitempty"` - Bucket string `json:"bucket,omitempty"` Ttl string `json:"ttl,omitempty"` //arangodb does not support binary blobs @@ -46,6 +56,7 @@ func (store *ArangodbStore) GetName() string { } func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + store.buckets = make(map[string]driver.Collection, 3) store.databaseName = configuration.GetString(prefix + "db_name") return store.connection(configuration.GetStringSlice(prefix+"servers"), configuration.GetString(prefix+"user"), @@ -85,49 +96,7 @@ func (store *ArangodbStore) connection(uris []string, user string, pass string, if err != nil { return err } - - coll_name := "files" - ok, err = store.database.CollectionExists(ctx, coll_name) - if err != nil { - return err - } - if ok { - store.collection, err = store.database.Collection(ctx, coll_name) - } else { - store.collection, err = store.database.CreateCollection(ctx, coll_name, &driver.CreateCollectionOptions{}) - } - if err != nil { - return err - } - - // ensure indices - - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory", "name"}, - &driver.EnsurePersistentIndexOptions{ - Name: "directory_name_multi", - Unique: true, - }); err != nil { - return err - } - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory"}, - &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil { - return err - } - - if _, _, err = store.collection.EnsureTTLIndex(ctx, "ttl", 1, - &driver.EnsureTTLIndexOptions{Name: "IDX_TTL"}); err != nil { - return err - } - - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{ - Name: "IDX_name", - }); err != nil { - return err - } - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"bucket"}, &driver.EnsurePersistentIndexOptions{ - Name: "IDX_bucket", - Sparse: true, //sparse index, to locate files of bucket - }); err != nil { + if store.kvCollection, err = store.ensureCollection(ctx, KVMETA_COLLECTION); err != nil { return err } return err @@ -140,8 +109,13 @@ const ( ) func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + keys := make([]string, 0, len(store.buckets)+1) + for k := range store.buckets { + keys = append(keys, k) + } + keys = append(keys, store.kvCollection.Name()) txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{ - Exclusive: []string{"files"}, + Exclusive: keys, }, &driver.BeginTransactionOptions{}) if err != nil { return nil, err @@ -186,23 +160,27 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) if len(entry.Chunks) > 50 { meta = util.MaybeGzipData(meta) } - bucket, _ := extractBucket(entry.FullPath) model := &Model{ Key: hashString(string(entry.FullPath)), Directory: dir, Name: name, Meta: bytesToArray(meta), - Bucket: bucket, } if entry.TtlSec > 0 { model.Ttl = time.Now().Add(time.Second * time.Duration(entry.TtlSec)).Format(time.RFC3339) } else { model.Ttl = "" } - _, err = store.collection.CreateDocument(ctx, model) + + targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath) + if err != nil { + return err + } + _, err = targetCollection.CreateDocument(ctx, model) if driver.IsConflict(err) { return store.UpdateEntry(ctx, entry) } + if err != nil { return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err) } @@ -211,21 +189,6 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) } -func extractBucket(fullpath util.FullPath) (string, string) { - if !strings.HasPrefix(string(fullpath), "/buckets/") { - return "", string(fullpath) - } - bucketAndObjectKey := string(fullpath)[len("/buckets/"):] - t := strings.Index(bucketAndObjectKey, "/") - bucket := bucketAndObjectKey - shortPath := "/" - if t > 0 { - bucket = bucketAndObjectKey[:t] - shortPath = string(util.FullPath(bucketAndObjectKey[t:])) - } - return bucket, shortPath -} - func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -247,9 +210,11 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) } else { model.Ttl = "none" } - - _, err = store.collection.UpdateDocument(ctx, model.Key, model) - + targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath) + if err != nil { + return err + } + _, err = targetCollection.UpdateDocument(ctx, model.Key, model) if err != nil { return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err) } @@ -259,11 +224,15 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { var data Model - _, err = store.collection.ReadDocument(ctx, hashString(string(fullpath)), &data) - if driver.IsNotFound(err) { - return nil, filer_pb.ErrNotFound + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return nil, err } + _, err = targetCollection.ReadDocument(ctx, hashString(string(fullpath)), &data) if err != nil { + if driver.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } glog.Errorf("find %s: %v", fullpath, err) return nil, filer_pb.ErrNotFound } @@ -281,8 +250,12 @@ func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPat return entry, nil } -func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - _, err := store.collection.RemoveDocument(ctx, hashString(string(fullpath))) +func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return err + } + _, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath))) if err != nil && !driver.IsNotFound(err) { glog.Errorf("find %s: %v", fullpath, err) return fmt.Errorf("delete %s : %v", fullpath, err) @@ -290,14 +263,21 @@ func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullP return nil } -func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { +// this runs in log time +func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { var query string + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return err + } query = query + fmt.Sprintf(` - for d in files + for d in %s filter starts_with(d.directory, "%s/") || d.directory == "%s" - remove d._key in files`, + remove d._key in %s`, + targetCollection.Name(), strings.Join(strings.Split(string(fullpath), "/"), ","), string(fullpath), + targetCollection.Name(), ) cur, err := store.database.Query(ctx, query, nil) if err != nil { @@ -307,70 +287,33 @@ func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath u return nil } +func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - // if no prefix, then dont use index - if prefix == "" { - return store.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc) + targetCollection, err := store.extractBucketCollection(ctx, dirPath) + if err != nil { + return lastFileName, err } - eq := "" + query := "for d in " + targetCollection.Name() if includeStartFile { - eq = "filter d.name >= \"" + startFileName + "\"" + query = query + " filter d.name >= \"" + startFileName + "\" " } else { - eq = "filter d.name > \"" + startFileName + "\"" + query = query + " filter d.name > \"" + startFileName + "\" " } - query := fmt.Sprintf(` -for d in files + if prefix != "" { + query = query + fmt.Sprintf(`&& starts_with(d.name, "%s")`, prefix) + } + query = query + ` filter d.directory == @dir -filter starts_with(d.name, @prefix) -%s sort d.name asc -limit %d -return d`, eq, limit) - cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath, "prefix": prefix}) - if err != nil { - return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) +` + if limit > 0 { + query = query + "limit " + strconv.Itoa(int(limit)) } - defer cur.Close() - for cur.HasMore() { - var data Model - _, err = cur.ReadDocument(ctx, &data) - if err != nil { - break - } - entry := &filer.Entry{ - FullPath: util.NewFullPath(data.Directory, data.Name), - } - lastFileName = data.Name - converted := arrayToBytes(data.Meta) - if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - - if !eachEntryFunc(entry) { - break - } - - } - return lastFileName, err -} - -func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - eq := "" - if includeStartFile { - eq = "filter d.name >= \"" + startFileName + "\"" - } else { - eq = "filter d.name > \"" + startFileName + "\"" - } - query := fmt.Sprintf(` -for d in files -filter d.directory == "%s" -%s -sort d.name asc -limit %d -return d`, string(dirPath), eq, limit) - cur, err := store.database.Query(ctx, query, nil) + query = query + "\n return d" + cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath}) if err != nil { return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) } @@ -382,7 +325,7 @@ return d`, string(dirPath), eq, limit) break } entry := &filer.Entry{ - FullPath: util.NewFullPath(string(dirPath), data.Name), + FullPath: util.NewFullPath(data.Directory, data.Name), } lastFileName = data.Name converted := arrayToBytes(data.Meta) |
