diff options
Diffstat (limited to 'weed/filer')
95 files changed, 4794 insertions, 698 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 4bf9b16fa..13268b944 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -156,7 +156,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go index 03b016c76..aaf1c196c 100644 --- a/weed/filer/abstract_sql/abstract_sql_store_kv.go +++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go @@ -18,7 +18,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by return fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value) if err == nil { @@ -53,7 +53,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b return nil, fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr) err = row.Scan(&value) @@ -76,7 +76,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er return fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr) if err != nil { @@ -92,7 +92,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er } -func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) { +func GenDirAndName(key []byte) (dirStr string, dirHash int64, name string) { for len(key) < 8 { key = append(key, 0) } diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go new file mode 100644 index 000000000..13d14b2b0 --- /dev/null +++ b/weed/filer/arangodb/arangodb_store.go @@ -0,0 +1,347 @@ +package arangodb + +import ( + "context" + "crypto/tls" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/arangodb/go-driver" + "github.com/arangodb/go-driver/http" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + filer.Stores = append(filer.Stores, &ArangodbStore{}) +} + +var ( + BUCKET_PREFIX = "/buckets" + DEFAULT_COLLECTION = "seaweed_no_bucket" + KVMETA_COLLECTION = "seaweed_kvmeta" +) + +type ArangodbStore struct { + connect driver.Connection + client driver.Client + database driver.Database + kvCollection driver.Collection + + buckets map[string]driver.Collection + mu sync.RWMutex + + databaseName string +} + +type Model struct { + Key string `json:"_key"` + Directory string `json:"directory,omitempty"` + Name string `json:"name,omitempty"` + Ttl string `json:"ttl,omitempty"` + + //arangodb does not support binary blobs + //we encode byte slice into uint64 slice + //see helpers.go + Meta []uint64 `json:"meta"` +} + +func (store *ArangodbStore) GetName() string { + return "arangodb" +} + +func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + store.buckets = make(map[string]driver.Collection, 3) + store.databaseName = configuration.GetString(prefix + "db_name") + return store.connection(configuration.GetStringSlice(prefix+"servers"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"insecure_skip_verify"), + ) +} + +func (store *ArangodbStore) connection(uris []string, user string, pass string, insecure bool) (err error) { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + + store.connect, err = http.NewConnection(http.ConnectionConfig{ + Endpoints: uris, + TLSConfig: &tls.Config{ + InsecureSkipVerify: insecure, + }, + }) + if err != nil { + return err + } + store.client, err = driver.NewClient(driver.ClientConfig{ + Connection: store.connect, + Authentication: driver.BasicAuthentication(user, pass), + }) + if err != nil { + return err + } + ok, err := store.client.DatabaseExists(ctx, store.databaseName) + if err != nil { + return err + } + if ok { + store.database, err = store.client.Database(ctx, store.databaseName) + } else { + store.database, err = store.client.CreateDatabase(ctx, store.databaseName, &driver.CreateDatabaseOptions{}) + } + if err != nil { + return err + } + if store.kvCollection, err = store.ensureCollection(ctx, KVMETA_COLLECTION); err != nil { + return err + } + return err +} + +type key int + +const ( + transactionKey key = 0 +) + +func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + keys := make([]string, 0, len(store.buckets)+1) + for k := range store.buckets { + keys = append(keys, k) + } + keys = append(keys, store.kvCollection.Name()) + txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{ + Exclusive: keys, + }, &driver.BeginTransactionOptions{}) + if err != nil { + return nil, err + } + + return context.WithValue(ctx, transactionKey, txn), nil +} + +func (store *ArangodbStore) CommitTransaction(ctx context.Context) error { + val := ctx.Value(transactionKey) + cast, ok := val.(driver.TransactionID) + if !ok { + return fmt.Errorf("txn cast fail %s:", val) + } + err := store.database.CommitTransaction(ctx, cast, &driver.CommitTransactionOptions{}) + if err != nil { + return err + } + return nil +} + +func (store *ArangodbStore) RollbackTransaction(ctx context.Context) error { + val := ctx.Value(transactionKey) + cast, ok := val.(driver.TransactionID) + if !ok { + return fmt.Errorf("txn cast fail %s:", val) + } + err := store.database.AbortTransaction(ctx, cast, &driver.AbortTransactionOptions{}) + if err != nil { + return err + } + return nil +} + +func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + model := &Model{ + Key: hashString(string(entry.FullPath)), + Directory: dir, + Name: name, + Meta: bytesToArray(meta), + } + if entry.TtlSec > 0 { + model.Ttl = time.Now().Add(time.Second * time.Duration(entry.TtlSec)).Format(time.RFC3339) + } else { + model.Ttl = "" + } + + targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath) + if err != nil { + return err + } + _, err = targetCollection.CreateDocument(ctx, model) + if driver.IsConflict(err) { + return store.UpdateEntry(ctx, entry) + } + + if err != nil { + return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err) + } + + return nil + +} + +func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + model := &Model{ + Key: hashString(string(entry.FullPath)), + Directory: dir, + Name: name, + Meta: bytesToArray(meta), + } + if entry.TtlSec > 0 { + model.Ttl = time.Now().Add(time.Duration(entry.TtlSec) * time.Second).Format(time.RFC3339) + } else { + model.Ttl = "none" + } + targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath) + if err != nil { + return err + } + _, err = targetCollection.UpdateDocument(ctx, model.Key, model) + if err != nil { + return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err) + } + + return nil +} + +func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + var data Model + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return nil, err + } + _, err = targetCollection.ReadDocument(ctx, hashString(string(fullpath)), &data) + if err != nil { + if driver.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } + glog.Errorf("find %s: %v", fullpath, err) + return nil, filer_pb.ErrNotFound + } + if len(data.Meta) == 0 { + return nil, filer_pb.ErrNotFound + } + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(arrayToBytes(data.Meta))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return err + } + _, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath))) + if err != nil && !driver.IsNotFound(err) { + glog.Errorf("find %s: %v", fullpath, err) + return fmt.Errorf("delete %s : %v", fullpath, err) + } + return nil +} + +// this runs in log time +func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + var query string + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return err + } + query = query + fmt.Sprintf(` + for d in %s + filter starts_with(d.directory, "%s/") || d.directory == "%s" + remove d._key in %s`, + targetCollection.Name(), + strings.Join(strings.Split(string(fullpath), "/"), ","), + string(fullpath), + targetCollection.Name(), + ) + cur, err := store.database.Query(ctx, query, nil) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + defer cur.Close() + return nil +} + +func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/") + if err != nil { + return lastFileName, err + } + query := "for d in " + targetCollection.Name() + if includeStartFile { + query = query + " filter d.name >= \"" + startFileName + "\" " + } else { + query = query + " filter d.name > \"" + startFileName + "\" " + } + if prefix != "" { + query = query + fmt.Sprintf(`&& starts_with(d.name, "%s")`, prefix) + } + query = query + ` +filter d.directory == @dir +sort d.name asc +` + if limit > 0 { + query = query + "limit " + strconv.Itoa(int(limit)) + } + query = query + "\n return d" + cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath}) + if err != nil { + return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) + } + defer cur.Close() + for cur.HasMore() { + var data Model + _, err = cur.ReadDocument(ctx, &data) + if err != nil { + break + } + entry := &filer.Entry{ + FullPath: util.NewFullPath(data.Directory, data.Name), + } + lastFileName = data.Name + converted := arrayToBytes(data.Meta) + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + + if !eachEntryFunc(entry) { + break + } + + } + return lastFileName, err +} + +func (store *ArangodbStore) Shutdown() { +} diff --git a/weed/filer/arangodb/arangodb_store_bucket.go b/weed/filer/arangodb/arangodb_store_bucket.go new file mode 100644 index 000000000..810d639a7 --- /dev/null +++ b/weed/filer/arangodb/arangodb_store_bucket.go @@ -0,0 +1,40 @@ +package arangodb + +import ( + "context" + "github.com/arangodb/go-driver" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +var _ filer.BucketAware = (*ArangodbStore)(nil) + +func (store *ArangodbStore) OnBucketCreation(bucket string) { + timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // create the collection && add to cache + _, err := store.ensureBucket(timeout, bucket) + if err != nil { + glog.Errorf("bucket create %s: %v", bucket, err) + } +} +func (store *ArangodbStore) OnBucketDeletion(bucket string) { + timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + collection, err := store.ensureBucket(timeout, bucket) + if err != nil { + glog.Errorf("bucket delete %s: %v", bucket, err) + return + } + err = collection.Remove(timeout) + if err != nil && !driver.IsNotFound(err) { + glog.Errorf("bucket delete %s: %v", bucket, err) + return + } +} +func (store *ArangodbStore) CanDropWholeBucket() bool { + return true +} diff --git a/weed/filer/arangodb/arangodb_store_kv.go b/weed/filer/arangodb/arangodb_store_kv.go new file mode 100644 index 000000000..c1307e78d --- /dev/null +++ b/weed/filer/arangodb/arangodb_store_kv.go @@ -0,0 +1,54 @@ +package arangodb + +import ( + "context" + "fmt" + + "github.com/arangodb/go-driver" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + model := &Model{ + Key: hashString(".kvstore." + string(key)), + Directory: ".kvstore." + string(key), + Meta: bytesToArray(value), + } + + exists, err := store.kvCollection.DocumentExists(ctx, model.Key) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + if exists { + _, err = store.kvCollection.UpdateDocument(ctx, model.Key, model) + } else { + _, err = store.kvCollection.CreateDocument(ctx, model) + } + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} +func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + var model Model + _, err = store.kvCollection.ReadDocument(ctx, hashString(".kvstore."+string(key)), &model) + if driver.IsNotFound(err) { + return nil, filer.ErrKvNotFound + } + if err != nil { + glog.Errorf("kv get: %s %v", string(key), err) + return nil, filer.ErrKvNotFound + } + return arrayToBytes(model.Meta), nil +} + +func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) { + _, err = store.kvCollection.RemoveDocument(ctx, hashString(".kvstore."+string(key))) + if err != nil { + glog.Errorf("kv del: %v", err) + return filer.ErrKvNotFound + } + return nil +} diff --git a/weed/filer/arangodb/helpers.go b/weed/filer/arangodb/helpers.go new file mode 100644 index 000000000..943189781 --- /dev/null +++ b/weed/filer/arangodb/helpers.go @@ -0,0 +1,136 @@ +package arangodb + +import ( + "context" + "crypto/md5" + "encoding/binary" + "encoding/hex" + "io" + "strings" + + "github.com/arangodb/go-driver" + "github.com/chrislusf/seaweedfs/weed/util" +) + +//convert a string into arango-key safe hex bytes hash +func hashString(dir string) string { + h := md5.New() + io.WriteString(h, dir) + b := h.Sum(nil) + return hex.EncodeToString(b) +} + +// convert slice of bytes into slice of uint64 +// the first uint64 indicates the length in bytes +func bytesToArray(bs []byte) []uint64 { + out := make([]uint64, 0, 2+len(bs)/8) + out = append(out, uint64(len(bs))) + for len(bs)%8 != 0 { + bs = append(bs, 0) + } + for i := 0; i < len(bs); i = i + 8 { + out = append(out, binary.BigEndian.Uint64(bs[i:])) + } + return out +} + +// convert from slice of uint64 back to bytes +// if input length is 0 or 1, will return nil +func arrayToBytes(xs []uint64) []byte { + if len(xs) < 2 { + return nil + } + first := xs[0] + out := make([]byte, len(xs)*8) // i think this can actually be len(xs)*8-8, but i dont think an extra 8 bytes hurts... + for i := 1; i < len(xs); i = i + 1 { + binary.BigEndian.PutUint64(out[((i-1)*8):], xs[i]) + } + return out[:first] +} + +// gets the collection the bucket points to from filepath +func (store *ArangodbStore) extractBucketCollection(ctx context.Context, fullpath util.FullPath) (c driver.Collection, err error) { + bucket, _ := extractBucket(fullpath) + if bucket == "" { + bucket = DEFAULT_COLLECTION + } + c, err = store.ensureBucket(ctx, bucket) + if err != nil { + return nil, err + } + return c, err +} + +// called by extractBucketCollection +func extractBucket(fullpath util.FullPath) (string, string) { + if !strings.HasPrefix(string(fullpath), BUCKET_PREFIX+"/") { + return "", string(fullpath) + } + if strings.Count(string(fullpath), "/") < 3 { + return "", string(fullpath) + } + bucketAndObjectKey := string(fullpath)[len(BUCKET_PREFIX+"/"):] + t := strings.Index(bucketAndObjectKey, "/") + bucket := bucketAndObjectKey + shortPath := "/" + if t > 0 { + bucket = bucketAndObjectKey[:t] + shortPath = string(util.FullPath(bucketAndObjectKey[t:])) + } + return bucket, shortPath +} + +// get bucket collection from cache. if not exist, creates the buckets collection and grab it +func (store *ArangodbStore) ensureBucket(ctx context.Context, bucket string) (bc driver.Collection, err error) { + var ok bool + store.mu.RLock() + bc, ok = store.buckets[bucket] + store.mu.RUnlock() + if ok { + return bc, nil + } + store.mu.Lock() + defer store.mu.Unlock() + store.buckets[bucket], err = store.ensureCollection(ctx, bucket) + if err != nil { + return nil, err + } + return store.buckets[bucket], nil +} + +// creates collection if not exist, ensures indices if not exist +func (store *ArangodbStore) ensureCollection(ctx context.Context, name string) (c driver.Collection, err error) { + ok, err := store.database.CollectionExists(ctx, name) + if err != nil { + return + } + if ok { + c, err = store.database.Collection(ctx, name) + } else { + c, err = store.database.CreateCollection(ctx, name, &driver.CreateCollectionOptions{}) + } + if err != nil { + return + } + // ensure indices + if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory", "name"}, + &driver.EnsurePersistentIndexOptions{ + Name: "directory_name_multi", Unique: true, + }); err != nil { + return + } + if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory"}, + &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil { + return + } + if _, _, err = c.EnsureTTLIndex(ctx, "ttl", 1, + &driver.EnsureTTLIndexOptions{Name: "IDX_TTL"}); err != nil { + return + } + if _, _, err = c.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{ + Name: "IDX_name", + }); err != nil { + return + } + return c, nil +} diff --git a/weed/filer/arangodb/readme.md b/weed/filer/arangodb/readme.md new file mode 100644 index 000000000..e189811fb --- /dev/null +++ b/weed/filer/arangodb/readme.md @@ -0,0 +1,52 @@ +##arangodb + +database: https://github.com/arangodb/arangodb +go driver: https://github.com/arangodb/go-driver + +options: + +``` +[arangodb] +enabled=true +db_name="seaweedfs" +servers=["http://localhost:8529"] +#basic auth +user="root" +pass="test" + +# tls settings +insecure_skip_verify=true +``` + +i test using this dev database: +`docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=test arangodb/arangodb:3.9.0` + + +## features i don't personally need but are missing + [ ] provide tls cert to arango + [ ] authentication that is not basic auth + [ ] synchronise endpoint interval config + [ ] automatic creation of custom index + [ ] configure default arangodb collection sharding rules + [ ] configure default arangodb collection replication rules + + +## complexity + +ok, so if https://www.arangodb.com/docs/stable/indexing-index-basics.html#persistent-index is correct + +O(1) +- InsertEntry +- UpdateEntry +- FindEntry +- DeleteEntry +- KvPut +- KvGet +- KvDelete + +O(log(BUCKET_SIZE)) +- DeleteFolderChildren + +O(log(DIRECTORY_SIZE)) +- ListDirectoryEntries +- ListDirectoryPrefixedEntries diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index fc0b52ac7..d8c094a45 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/gocql/gocql" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -33,6 +34,7 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix configuration.GetString(prefix+"password"), configuration.GetStringSlice(prefix+"superLargeDirectories"), configuration.GetString(prefix+"localDC"), + configuration.GetInt(prefix+"connection_timeout_millisecond"), ) } @@ -41,12 +43,14 @@ func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, return } -func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string) (err error) { +func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string, timeout int) (err error) { store.cluster = gocql.NewCluster(hosts...) if username != "" && password != "" { store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password} } store.cluster.Keyspace = keyspace + store.cluster.Timeout = time.Duration(timeout) * time.Millisecond + glog.V(0).Infof("timeout = %d", timeout) fallback := gocql.RoundRobinHostPolicy() if localDC != "" { fallback = gocql.DCAwareRoundRobinPolicy(localDC) @@ -96,7 +100,7 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go index 9ef2f3e0f..85fc65d13 100644 --- a/weed/filer/configuration.go +++ b/weed/filer/configuration.go @@ -12,7 +12,7 @@ var ( Stores []FilerStore ) -func (f *Filer) LoadConfiguration(config *util.ViperProxy) { +func (f *Filer) LoadConfiguration(config *util.ViperProxy) (isFresh bool) { validateOneEnabledStore(config) @@ -24,7 +24,7 @@ func (f *Filer) LoadConfiguration(config *util.ViperProxy) { if err := store.Initialize(config, store.GetName()+"."); err != nil { glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err) } - f.SetStore(store) + isFresh = f.SetStore(store) glog.V(0).Infof("configured filer store to %s", store.GetName()) hasDefaultStoreConfigured = true break @@ -77,6 +77,7 @@ func (f *Filer) LoadConfiguration(config *util.ViperProxy) { glog.V(0).Infof("configure filer %s for %s", store.GetName(), location) } + return } func validateOneEnabledStore(config *util.ViperProxy) { diff --git a/weed/filer/elastic/v7/doc.go b/weed/filer/elastic/v7/doc.go new file mode 100644 index 000000000..704bbf6de --- /dev/null +++ b/weed/filer/elastic/v7/doc.go @@ -0,0 +1,9 @@ +/* + +Package elastic is for elastic filer store. + +The referenced "github.com/olivere/elastic/v7" library is too big when compiled. +So this is only compiled in "make full_install". + +*/ +package elastic diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index a16e5ebca..cb2c66f5a 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -1,3 +1,6 @@ +//go:build elastic +// +build elastic + package elastic import ( diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go index 99c03314e..43835c153 100644 --- a/weed/filer/elastic/v7/elastic_store_kv.go +++ b/weed/filer/elastic/v7/elastic_store_kv.go @@ -1,3 +1,6 @@ +//go:build elastic +// +build elastic + package elastic import ( diff --git a/weed/filer/entry.go b/weed/filer/entry.go index 8fa75fe6b..8dd00f010 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -15,15 +15,14 @@ type Attr struct { Uid uint32 // owner uid Gid uint32 // group gid Mime string // mime type - Replication string // replication - Collection string // collection name TtlSec int32 // ttl in seconds - DiskType string UserName string GroupNames []string SymlinkTarget string Md5 []byte FileSize uint64 + Rdev uint32 + Inode uint64 } func (attr Attr) IsDirectory() bool { @@ -43,6 +42,7 @@ type Entry struct { HardLinkCounter int32 Content []byte Remote *filer_pb.RemoteEntry + Quota int64 } func (entry *Entry) Size() uint64 { @@ -70,6 +70,7 @@ func (entry *Entry) ShallowClone() *Entry { newEntry.HardLinkCounter = entry.HardLinkCounter newEntry.Content = entry.Content newEntry.Remote = entry.Remote + newEntry.Quota = entry.Quota return newEntry } @@ -96,6 +97,7 @@ func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) { message.HardLinkCounter = entry.HardLinkCounter message.Content = entry.Content message.RemoteEntry = entry.Remote + message.Quota = entry.Quota } func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) { @@ -106,6 +108,7 @@ func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) { fsEntry.HardLinkCounter = message.HardLinkCounter fsEntry.Content = message.Content fsEntry.Remote = message.RemoteEntry + fsEntry.Quota = message.Quota } func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go index 55c937b39..3d29ba0b4 100644 --- a/weed/filer/entry_codec.go +++ b/weed/filer/entry_codec.go @@ -39,15 +39,14 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { Uid: entry.Uid, Gid: entry.Gid, Mime: entry.Mime, - Collection: entry.Attr.Collection, - Replication: entry.Attr.Replication, TtlSec: entry.Attr.TtlSec, - DiskType: entry.Attr.DiskType, UserName: entry.Attr.UserName, GroupName: entry.Attr.GroupNames, SymlinkTarget: entry.Attr.SymlinkTarget, Md5: entry.Attr.Md5, FileSize: entry.Attr.FileSize, + Rdev: entry.Attr.Rdev, + Inode: entry.Attr.Inode, } } @@ -65,15 +64,14 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t.Uid = attr.Uid t.Gid = attr.Gid t.Mime = attr.Mime - t.Collection = attr.Collection - t.Replication = attr.Replication t.TtlSec = attr.TtlSec - t.DiskType = attr.DiskType t.UserName = attr.UserName t.GroupNames = attr.GroupName t.SymlinkTarget = attr.SymlinkTarget t.Md5 = attr.Md5 t.FileSize = attr.FileSize + t.Rdev = attr.Rdev + t.Inode = attr.Inode return t } @@ -118,6 +116,9 @@ func EqualEntry(a, b *Entry) bool { if !proto.Equal(a.Remote, b.Remote) { return false } + if a.Quota != b.Quota { + return false + } return true } diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 71ed738f9..0dd7dbee2 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/client/v3" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -82,7 +82,7 @@ func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (er return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = weed_util.MaybeGzipData(meta) } @@ -152,7 +152,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u } resp, err := store.client.Get(ctx, string(lastFileStart), - clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + clientv3.WithFromKey(), clientv3.WithLimit(limit+1)) if err != nil { return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } diff --git a/weed/filer/etcd/etcd_store_test.go b/weed/filer/etcd/etcd_store_test.go new file mode 100644 index 000000000..824c28f5a --- /dev/null +++ b/weed/filer/etcd/etcd_store_test.go @@ -0,0 +1,16 @@ +package etcd + +import ( + "github.com/chrislusf/seaweedfs/weed/filer/store_test" + "testing" +) + +func TestStore(t *testing.T) { + // run "make test_etcd" under docker folder. + // to set up local env + if false { + store := &EtcdStore{} + store.initialize("localhost:2379", "3s") + store_test.TestFilerStore(t, store) + } +} diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 00dbf1fd6..4eb657dfa 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -3,11 +3,15 @@ package filer import ( "bytes" "fmt" - "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "math" + "net/url" + "strings" + "sync" "time" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/golang/protobuf/proto" "github.com/chrislusf/seaweedfs/weed/glog" @@ -19,6 +23,12 @@ const ( ManifestBatch = 10000 ) +var bytesBufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { for _, chunk := range chunks { if chunk.IsChunkManifest { @@ -54,17 +64,17 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) if err != nil { - return chunks, nil, err + return dataChunks, nil, err } manifestChunks = append(manifestChunks, chunk) // recursive - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) + subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { - return chunks, nil, subErr + return dataChunks, nil, subErr } - dataChunks = append(dataChunks, dchunks...) - manifestChunks = append(manifestChunks, mchunks...) + dataChunks = append(dataChunks, subDataChunks...) + manifestChunks = append(manifestChunks, subManifestChunks...) } return } @@ -75,12 +85,15 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // IsChunkManifest - data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer) + bytesBuffer.Reset() + defer bytesBufferPool.Put(bytesBuffer) + err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) if err != nil { return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) } m := &filer_pb.FileChunkManifest{} - if err := proto.Unmarshal(data, m); err != nil { + if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil { return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) } @@ -90,26 +103,43 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // TODO fetch from cache for weed mount? -func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { +func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err + return err + } + err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0) + if err != nil { + return err } - return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0) + return nil } -func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { +func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { + urlStrings, err := lookupFileIdFn(fileId) + if err != nil { + glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) + return 0, err + } + return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) +} + +func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { - var err error var shouldRetry bool - receivedData := make([]byte, 0, size) for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - receivedData = receivedData[:0] - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { - receivedData = append(receivedData, data...) + n = 0 + if strings.Contains(urlString, "%") { + urlString = url.PathEscape(urlString) + } + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + if n < len(buffer) { + x := copy(buffer[n:], data) + n += x + } }) if !shouldRetry { break @@ -128,7 +158,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool } } - return receivedData, err + return n, err } diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 0dc03f6e2..48b344bf8 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -3,11 +3,12 @@ package filer import ( "bytes" "fmt" - "github.com/chrislusf/seaweedfs/weed/wdclient" "math" - "sort" "sync" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "golang.org/x/exp/slices" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -23,7 +24,16 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { } func FileSize(entry *filer_pb.Entry) (size uint64) { - return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) + if entry == nil || entry.Attributes == nil { + return 0 + } + fileSize := entry.Attributes.FileSize + if entry.RemoteEntry != nil { + if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime { + fileSize = maxUint64(fileSize, uint64(entry.RemoteEntry.RemoteSize)) + } + } + return maxUint64(TotalSize(entry.Chunks), fileSize) } func ETag(entry *filer_pb.Entry) (etag string) { @@ -101,6 +111,21 @@ func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { return } +func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { + + fileIds := make(map[string]bool) + for _, interval := range bs { + fileIds[interval.GetFileIdString()] = true + } + for _, chunk := range as { + if _, found := fileIds[chunk.GetSourceFileId()]; !found { + delta = append(delta, chunk) + } + } + + return +} + type ChunkView struct { FileId string Offset int64 @@ -224,19 +249,26 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) { chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset) + if err != nil { + return + } + + visibles2 := readResolvedChunks(chunks) - sort.Slice(chunks, func(i, j int) bool { - if chunks[i].Mtime == chunks[j].Mtime { - filer_pb.EnsureFid(chunks[i]) - filer_pb.EnsureFid(chunks[j]) - if chunks[i].Fid == nil || chunks[j].Fid == nil { + if true { + return visibles2, err + } + slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool { + if a.Mtime == b.Mtime { + filer_pb.EnsureFid(a) + filer_pb.EnsureFid(b) + if a.Fid == nil || b.Fid == nil { return true } - return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + return a.Fid.FileKey < b.Fid.FileKey } - return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run + return a.Mtime < b.Mtime }) - for _, chunk := range chunks { // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) @@ -246,9 +278,26 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction } + if len(visibles) != len(visibles2) { + fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2)) + } else { + for i := 0; i < len(visibles); i++ { + checkDifference(visibles[i], visibles2[i]) + } + } + return } +func checkDifference(x, y VisibleInterval) { + if x.start != y.start || + x.stop != y.stop || + x.fileId != y.fileId || + x.modifiedTime != y.modifiedTime { + fmt.Printf("different visible %+v : %+v\n", x, y) + } +} + // find non-overlapping visible intervals // visible interval map to one file chunk diff --git a/weed/filer/filechunks2_test.go b/weed/filer/filechunks2_test.go index 9f9566d9b..39dec87c9 100644 --- a/weed/filer/filechunks2_test.go +++ b/weed/filer/filechunks2_test.go @@ -1,7 +1,7 @@ package filer import ( - "sort" + "golang.org/x/exp/slices" "testing" "github.com/chrislusf/seaweedfs/weed/glog" @@ -34,11 +34,11 @@ func TestCompactFileChunksRealCase(t *testing.T) { } func printChunks(name string, chunks []*filer_pb.FileChunk) { - sort.Slice(chunks, func(i, j int) bool { - if chunks[i].Offset == chunks[j].Offset { - return chunks[i].Mtime < chunks[j].Mtime + slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool { + if a.Offset == b.Offset { + return a.Mtime < b.Mtime } - return chunks[i].Offset < chunks[j].Offset + return a.Offset < b.Offset }) for _, chunk := range chunks { glog.V(0).Infof("%s chunk %s [%10d,%10d)", name, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go new file mode 100644 index 000000000..1d0bd837a --- /dev/null +++ b/weed/filer/filechunks_read.go @@ -0,0 +1,116 @@ +package filer + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "golang.org/x/exp/slices" +) + +func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { + + var points []*Point + for _, chunk := range chunks { + points = append(points, &Point{ + x: chunk.Offset, + ts: chunk.Mtime, + chunk: chunk, + isStart: true, + }) + points = append(points, &Point{ + x: chunk.Offset + int64(chunk.Size), + ts: chunk.Mtime, + chunk: chunk, + isStart: false, + }) + } + slices.SortFunc(points, func(a, b *Point) bool { + if a.x != b.x { + return a.x < b.x + } + if a.ts != b.ts { + return a.ts < b.ts + } + return !a.isStart + }) + + var prevX int64 + var queue []*Point + for _, point := range points { + if point.isStart { + if len(queue) > 0 { + lastIndex := len(queue) - 1 + lastPoint := queue[lastIndex] + if point.x != prevX && lastPoint.ts < point.ts { + visibles = addToVisibles(visibles, prevX, lastPoint, point) + prevX = point.x + } + } + // insert into queue + for i := len(queue); i >= 0; i-- { + if i == 0 || queue[i-1].ts <= point.ts { + if i == len(queue) { + prevX = point.x + } + queue = addToQueue(queue, i, point) + break + } + } + } else { + lastIndex := len(queue) - 1 + index := lastIndex + var startPoint *Point + for ; index >= 0; index-- { + startPoint = queue[index] + if startPoint.ts == point.ts { + queue = removeFromQueue(queue, index) + break + } + } + if index == lastIndex && startPoint != nil { + visibles = addToVisibles(visibles, prevX, startPoint, point) + prevX = point.x + } + } + } + + return +} + +func removeFromQueue(queue []*Point, index int) []*Point { + for i := index; i < len(queue)-1; i++ { + queue[i] = queue[i+1] + } + queue = queue[:len(queue)-1] + return queue +} + +func addToQueue(queue []*Point, index int, point *Point) []*Point { + queue = append(queue, point) + for i := len(queue) - 1; i > index; i-- { + queue[i], queue[i-1] = queue[i-1], queue[i] + } + return queue +} + +func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval { + if prevX < point.x { + chunk := startPoint.chunk + visibles = append(visibles, VisibleInterval{ + start: prevX, + stop: point.x, + fileId: chunk.GetFileIdString(), + modifiedTime: chunk.Mtime, + chunkOffset: prevX - chunk.Offset, + chunkSize: chunk.Size, + cipherKey: chunk.CipherKey, + isGzipped: chunk.IsCompressed, + }) + } + return visibles +} + +type Point struct { + x int64 + ts int64 + chunk *filer_pb.FileChunk + isStart bool +} diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go new file mode 100644 index 000000000..e70c66e6f --- /dev/null +++ b/weed/filer/filechunks_read_test.go @@ -0,0 +1,210 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "math/rand" + "testing" +) + +func TestReadResolvedChunks(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + { + FileId: "a", + Offset: 0, + Size: 100, + Mtime: 1, + }, + { + FileId: "b", + Offset: 50, + Size: 100, + Mtime: 2, + }, + { + FileId: "c", + Offset: 200, + Size: 50, + Mtime: 3, + }, + { + FileId: "d", + Offset: 250, + Size: 50, + Mtime: 4, + }, + { + FileId: "e", + Offset: 175, + Size: 100, + Mtime: 5, + }, + } + + visibles := readResolvedChunks(chunks) + + for _, visible := range visibles { + fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime) + } + +} + +func TestRandomizedReadResolvedChunks(t *testing.T) { + + var limit int64 = 1024 * 1024 + array := make([]int64, limit) + var chunks []*filer_pb.FileChunk + for ts := int64(0); ts < 1024; ts++ { + x := rand.Int63n(limit) + y := rand.Int63n(limit) + size := x - y + if size < 0 { + size = -size + } + if size > 1024 { + size = 1024 + } + start := x + if start > y { + start = y + } + chunks = append(chunks, randomWrite(array, start, size, ts)) + } + + visibles := readResolvedChunks(chunks) + + for _, visible := range visibles { + for i := visible.start; i < visible.stop; i++ { + if array[i] != visible.modifiedTime { + t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTime) + } + } + } + + // fmt.Printf("visibles %d", len(visibles)) + +} + +func randomWrite(array []int64, start int64, size int64, ts int64) *filer_pb.FileChunk { + for i := start; i < start+size; i++ { + array[i] = ts + } + // fmt.Printf("write [%d,%d) %d\n", start, start+size, ts) + return &filer_pb.FileChunk{ + FileId: "", + Offset: start, + Size: uint64(size), + Mtime: ts, + } +} + +func TestSequentialReadResolvedChunks(t *testing.T) { + + var chunkSize int64 = 1024 * 1024 * 2 + var chunks []*filer_pb.FileChunk + for ts := int64(0); ts < 13; ts++ { + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: "", + Offset: chunkSize * ts, + Size: uint64(chunkSize), + Mtime: 1, + }) + } + + visibles := readResolvedChunks(chunks) + + fmt.Printf("visibles %d", len(visibles)) + +} + +func TestActualReadResolvedChunks(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + { + FileId: "5,e7b96fef48", + Offset: 0, + Size: 2097152, + Mtime: 1634447487595823000, + }, + { + FileId: "5,e5562640b9", + Offset: 2097152, + Size: 2097152, + Mtime: 1634447487595826000, + }, + { + FileId: "5,df033e0fe4", + Offset: 4194304, + Size: 2097152, + Mtime: 1634447487595827000, + }, + { + FileId: "7,eb08148a9b", + Offset: 6291456, + Size: 2097152, + Mtime: 1634447487595827000, + }, + { + FileId: "7,e0f92d1604", + Offset: 8388608, + Size: 2097152, + Mtime: 1634447487595828000, + }, + { + FileId: "7,e33cb63262", + Offset: 10485760, + Size: 2097152, + Mtime: 1634447487595828000, + }, + { + FileId: "5,ea98e40e93", + Offset: 12582912, + Size: 2097152, + Mtime: 1634447487595829000, + }, + { + FileId: "5,e165661172", + Offset: 14680064, + Size: 2097152, + Mtime: 1634447487595829000, + }, + { + FileId: "3,e692097486", + Offset: 16777216, + Size: 2097152, + Mtime: 1634447487595830000, + }, + { + FileId: "3,e28e2e3cbd", + Offset: 18874368, + Size: 2097152, + Mtime: 1634447487595830000, + }, + { + FileId: "3,e443974d4e", + Offset: 20971520, + Size: 2097152, + Mtime: 1634447487595830000, + }, + { + FileId: "2,e815bed597", + Offset: 23068672, + Size: 2097152, + Mtime: 1634447487595831000, + }, + { + FileId: "5,e94715199e", + Offset: 25165824, + Size: 1974736, + Mtime: 1634447487595832000, + }, + } + + visibles := readResolvedChunks(chunks) + + for _, visible := range visibles { + fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime) + } + +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 1a20abefc..86827c50e 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,7 +3,11 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "os" + "sort" "strings" "time" @@ -33,8 +37,6 @@ type Filer struct { fileIdDeletionQueue *util.UnboundedQueue GrpcDialOption grpc.DialOption DirBucketsPath string - FsyncBuckets []string - buckets *FilerBuckets Cipher bool LocalMetaLogBuffer *log_buffer.LogBuffer metaLogCollection string @@ -43,16 +45,18 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + UniqueFileId uint32 } -func NewFiler(masters []string, grpcDialOption grpc.DialOption, - filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer { +func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, + filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ - MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), + UniqueFileId: uint32(util.RandomInt32()), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection @@ -63,32 +67,69 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, return f } -func (f *Filer) AggregateFromPeers(self string, filers []string) { - - // set peers - found := false - for _, peer := range filers { - if peer == self { - found = true - } +func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) { + if len(existingNodes) == 0 { + return } - if !found { - filers = append(filers, self) + sort.Slice(existingNodes, func(i, j int) bool { + return existingNodes[i].CreatedAtNs < existingNodes[j].CreatedAtNs + }) + earliestNode := existingNodes[0] + if earliestNode.Address == string(self) { + return } - f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) - f.MetaAggregator.StartLoopSubscribe(f, self) + glog.V(0).Infof("bootstrap from %v", earliestNode.Address) + err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil, + 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error { + return Replay(f.Store, resp) + }, pb.FatalOnError) + return +} + +func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) { + + f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) + f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate + + for _, peerUpdate := range existingNodes { + f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom) + } } -func (f *Filer) SetStore(store FilerStore) { - f.Store = NewFilerStoreWrapper(store) +func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { + + if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + FilerGroup: f.MasterClient.FilerGroup, + }) + + glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes)) + for _, node := range resp.ClusterNodes { + existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ + NodeType: cluster.FilerType, + Address: node.Address, + IsLeader: node.IsLeader, + IsAdd: true, + CreatedAtNs: node.CreatedAtNs, + }) + } + return err + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr) + } + return +} - f.setOrLoadFilerStoreSignature(store) +func (f *Filer) SetStore(store FilerStore) (isFresh bool) { + f.Store = NewFilerStoreWrapper(store) + return f.setOrLoadFilerStoreSignature(store) } -func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) { +func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) (isFresh bool) { storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId)) if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 { f.Signature = util.RandomInt32() @@ -98,23 +139,25 @@ func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) { glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err) } glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature) + return true } else if err == nil && len(storeIdBytes) == 4 { f.Signature = int32(util.BytesToUint32(storeIdBytes)) glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature) } else { glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err) } + return false } func (f *Filer) GetStore() (store FilerStore) { return f.Store } -func (fs *Filer) GetMaster() string { +func (fs *Filer) GetMaster() pb.ServerAddress { return fs.MasterClient.GetMaster() } -func (fs *Filer) KeepConnectedToMaster() { +func (fs *Filer) KeepMasterClientConnected() { fs.MasterClient.KeepConnectedToMaster() } @@ -130,7 +173,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error { return f.Store.RollbackTransaction(ctx) } -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error { +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool) error { if string(entry.FullPath) == "/" { return nil @@ -148,9 +191,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr if oldEntry == nil { - dirParts := strings.Split(string(entry.FullPath), "/") - if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { - return err + if !skipCreateParentDir { + dirParts := strings.Split(string(entry.FullPath), "/") + if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { + return err + } } glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name()) @@ -170,7 +215,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } } - f.maybeAddBucket(entry) f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) f.deleteChunksIfNotNew(oldEntry, entry) @@ -207,15 +251,13 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di dirEntry = &Entry{ FullPath: util.FullPath(dirPath), Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | entry.Mode | 0111, - Uid: entry.Uid, - Gid: entry.Gid, - Collection: entry.Collection, - Replication: entry.Replication, - UserName: entry.UserName, - GroupNames: entry.GroupNames, + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0111, + Uid: entry.Uid, + Gid: entry.Gid, + UserName: entry.UserName, + GroupNames: entry.GroupNames, }, } @@ -227,7 +269,6 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } } else { - f.maybeAddBucket(dirEntry) f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) } @@ -285,14 +326,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) { lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { - if entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - f.Store.DeleteOneEntry(ctx, entry) - expiredCount++ - return true + select { + case <-ctx.Done(): + return false + default: + if entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.Store.DeleteOneEntry(ctx, entry) + expiredCount++ + return true + } } + return eachEntryFunc(entry) } - return eachEntryFunc(entry) }) if err != nil { return expiredCount, lastFileName, err diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go index 38a1abadb..e9cb3547e 100644 --- a/weed/filer/filer_buckets.go +++ b/weed/filer/filer_buckets.go @@ -1,76 +1,9 @@ package filer import ( - "context" - "math" "strings" - "sync" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" ) -type BucketName string -type BucketOption struct { - Name BucketName - Replication string - fsync bool -} -type FilerBuckets struct { - dirBucketsPath string - buckets map[BucketName]*BucketOption - sync.RWMutex -} - -func (f *Filer) LoadBuckets() { - - f.buckets = &FilerBuckets{ - buckets: make(map[BucketName]*BucketOption), - } - - limit := int64(math.MaxInt32) - - entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "", "") - - if err != nil { - glog.V(1).Infof("no buckets found: %v", err) - return - } - - shouldFsyncMap := make(map[string]bool) - for _, bucket := range f.FsyncBuckets { - shouldFsyncMap[bucket] = true - } - - glog.V(1).Infof("buckets found: %d", len(entries)) - - f.buckets.Lock() - for _, entry := range entries { - _, shouldFsnyc := shouldFsyncMap[entry.Name()] - f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{ - Name: BucketName(entry.Name()), - Replication: entry.Replication, - fsync: shouldFsnyc, - } - } - f.buckets.Unlock() - -} - -func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) { - - f.buckets.RLock() - defer f.buckets.RUnlock() - - option, found := f.buckets.buckets[BucketName(buketName)] - - if !found { - return "", false - } - return option.Replication, option.fsync - -} - func (f *Filer) isBucket(entry *Entry) bool { if !entry.IsDirectory() { return false @@ -83,43 +16,6 @@ func (f *Filer) isBucket(entry *Entry) bool { return false } - f.buckets.RLock() - defer f.buckets.RUnlock() - - _, found := f.buckets.buckets[BucketName(dirName)] - - return found - -} - -func (f *Filer) maybeAddBucket(entry *Entry) { - if !entry.IsDirectory() { - return - } - parent, dirName := entry.FullPath.DirAndName() - if parent != f.DirBucketsPath { - return - } - f.addBucket(dirName, &BucketOption{ - Name: BucketName(dirName), - Replication: entry.Replication, - }) -} - -func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) { - - f.buckets.Lock() - defer f.buckets.Unlock() - - f.buckets.buckets[BucketName(buketName)] = bucketOption - -} - -func (f *Filer) deleteBucket(buketName string) { - - f.buckets.Lock() - defer f.buckets.Unlock() - - delete(f.buckets.buckets, BucketName(buketName)) + return true } diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index c58b26dc2..32fc647d9 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -3,6 +3,10 @@ package filer import ( "bytes" "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "google.golang.org/grpc" "io" "github.com/chrislusf/seaweedfs/weed/glog" @@ -26,6 +30,29 @@ type FilerConf struct { rules ptrie.Trie } +func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) { + var buf bytes.Buffer + if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if masterClient != nil { + return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf) + } else { + content, err := ReadInsideFiler(client, DirectoryEtcSeaweedFS, FilerConfName) + buf = *bytes.NewBuffer(content) + return err + } + }); err != nil && err != filer_pb.ErrNotFound { + return nil, fmt.Errorf("read %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err) + } + + fc := NewFilerConf() + if buf.Len() > 0 { + if err := fc.LoadFromBytes(buf.Bytes()); err != nil { + return nil, fmt.Errorf("parse %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err) + } + } + return fc, nil +} + func NewFilerConf() (fc *FilerConf) { fc = &FilerConf{ rules: ptrie.New(), @@ -48,12 +75,12 @@ func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) { return fc.LoadFromBytes(entry.Content) } - return fc.loadFromChunks(filer, entry.Content, entry.Chunks) + return fc.loadFromChunks(filer, entry.Content, entry.Chunks, entry.Size()) } -func (fc *FilerConf) loadFromChunks(filer *Filer, content []byte, chunks []*filer_pb.FileChunk) (err error) { +func (fc *FilerConf) loadFromChunks(filer *Filer, content []byte, chunks []*filer_pb.FileChunk, size uint64) (err error) { if len(content) == 0 { - content, err = filer.readEntry(chunks) + content, err = filer.readEntry(chunks, size) if err != nil { glog.Errorf("read filer conf content: %v", err) return @@ -115,21 +142,32 @@ func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf return pathConf } +func (fc *FilerConf) GetCollectionTtls(collection string) (ttls map[string]string) { + ttls = make(map[string]string) + fc.rules.Walk(func(key []byte, value interface{}) bool { + t := value.(*filer_pb.FilerConf_PathConf) + if t.Collection == collection { + ttls[t.LocationPrefix] = t.GetTtl() + } + return true + }) + return ttls +} + // merge if values in b is not empty, merge them into a func mergePathConf(a, b *filer_pb.FilerConf_PathConf) { a.Collection = util.Nvl(b.Collection, a.Collection) a.Replication = util.Nvl(b.Replication, a.Replication) a.Ttl = util.Nvl(b.Ttl, a.Ttl) - if b.DiskType != "" { - a.DiskType = b.DiskType - } + a.DiskType = util.Nvl(b.DiskType, a.DiskType) a.Fsync = b.Fsync || a.Fsync if b.VolumeGrowthCount > 0 { a.VolumeGrowthCount = b.VolumeGrowthCount } - if b.ReadOnly { - a.ReadOnly = b.ReadOnly - } + a.ReadOnly = b.ReadOnly || a.ReadOnly + a.DataCenter = util.Nvl(b.DataCenter, a.DataCenter) + a.Rack = util.Nvl(b.Rack, a.Rack) + a.DataNode = util.Nvl(b.DataNode, a.DataNode) } func (fc *FilerConf) ToProto() *filer_pb.FilerConf { diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 35187d034..0fc2f6c3c 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -9,12 +9,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -type HardLinkId []byte - const ( MsgFailDelNonEmptyFolder = "fail to delete non-empty folder" ) +type OnChunksFunc func([]*filer_pb.FileChunk) error +type OnHardLinkIdsFunc func([]HardLinkId) error + func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { if p == "/" { return nil @@ -24,23 +25,30 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR if findErr != nil { return findErr } - isDeleteCollection := f.isBucket(entry) - - var chunks []*filer_pb.FileChunk - var hardLinkIds []HardLinkId - chunks = append(chunks, entry.Chunks...) if entry.IsDirectory() { // delete the folder children, not including the folder itself - var dirChunks []*filer_pb.FileChunk - var dirHardLinkIds []HardLinkId - dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures) + err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures, func(chunks []*filer_pb.FileChunk) error { + if shouldDeleteChunks && !isDeleteCollection { + f.DirectDeleteChunks(chunks) + } + return nil + }, func(hardLinkIds []HardLinkId) error { + // A case not handled: + // what if the chunk is in a different collection? + if shouldDeleteChunks { + f.maybeDeleteHardLinks(hardLinkIds) + } + return nil + }) if err != nil { glog.V(0).Infof("delete directory %s: %v", p, err) return fmt.Errorf("delete directory %s: %v", p, err) } - chunks = append(chunks, dirChunks...) - hardLinkIds = append(hardLinkIds, dirHardLinkIds...) + } + + if shouldDeleteChunks && !isDeleteCollection { + f.DirectDeleteChunks(entry.Chunks) } // delete the file or folder @@ -49,25 +57,15 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR return fmt.Errorf("delete file %s: %v", p, err) } - if shouldDeleteChunks && !isDeleteCollection { - f.DirectDeleteChunks(chunks) - } - // A case not handled: - // what if the chunk is in a different collection? - if shouldDeleteChunks { - f.maybeDeleteHardLinks(hardLinkIds) - } - if isDeleteCollection { collectionName := entry.Name() f.doDeleteCollection(collectionName) - f.deleteBucket(collectionName) } return nil } -func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) { +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32, onChunksFn OnChunksFunc, onHardLinkIdsFn OnHardLinkIdsFunc) (err error) { lastFileName := "" includeLastFile := false @@ -76,34 +74,30 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "") if err != nil { glog.Errorf("list folder %s: %v", entry.FullPath, err) - return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) + return fmt.Errorf("list folder %s: %v", entry.FullPath, err) } if lastFileName == "" && !isRecursive && len(entries) > 0 { // only for first iteration in the loop - glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) - return nil, nil, fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath) + glog.V(0).Infof("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) + return fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath) } for _, sub := range entries { lastFileName = sub.Name() - var dirChunks []*filer_pb.FileChunk - var dirHardLinkIds []HardLinkId if sub.IsDirectory() { subIsDeletingBucket := f.isBucket(sub) - dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil) - chunks = append(chunks, dirChunks...) - hardlinkIds = append(hardlinkIds, dirHardLinkIds...) + err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil, onChunksFn, onHardLinkIdsFn) } else { f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) if len(sub.HardLinkId) != 0 { // hard link chunk data are deleted separately - hardlinkIds = append(hardlinkIds, sub.HardLinkId) + err = onHardLinkIdsFn([]HardLinkId{sub.HardLinkId}) } else { - chunks = append(chunks, sub.Chunks...) + err = onChunksFn(sub.Chunks) } } if err != nil && !ignoreRecursiveError { - return nil, nil, err + return err } } @@ -113,22 +107,26 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry } } - glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) + glog.V(3).Infof("deleting directory %v delete chunks: %v", entry.FullPath, shouldDeleteChunks) if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { - return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) + return fmt.Errorf("filer store delete: %v", storeDeletionErr) } f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) - return chunks, hardlinkIds, nil + return nil } func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) { glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) - if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { + if !entry.IsDirectory() && !shouldDeleteChunks { + if storeDeletionErr := f.Store.DeleteOneEntrySkipHardlink(ctx, entry.FullPath); storeDeletionErr != nil { + return fmt.Errorf("filer store delete skip hardlink: %v", storeDeletionErr) + } + } else if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { return fmt.Errorf("filer store delete: %v", storeDeletionErr) } if !entry.IsDirectory() { @@ -140,7 +138,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou func (f *Filer) doDeleteCollection(collectionName string) (err error) { - return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + return f.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: collectionName, }) diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index a12587541..e73f94151 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -1,6 +1,7 @@ package filer import ( + "math" "strings" "time" @@ -129,6 +130,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { } } +func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) { + for _, chunk := range chunks { + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + } +} + func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { if oldEntry == nil { @@ -136,18 +143,41 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { } if newEntry == nil { f.DeleteChunks(oldEntry.Chunks) + return } var toDelete []*filer_pb.FileChunk newChunkIds := make(map[string]bool) - for _, newChunk := range newEntry.Chunks { + newDataChunks, newManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), + newEntry.Chunks, 0, math.MaxInt64) + if err != nil { + glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s", + newEntry.Chunks, oldEntry.Chunks) + return + } + for _, newChunk := range newDataChunks { + newChunkIds[newChunk.GetFileIdString()] = true + } + for _, newChunk := range newManifestChunks { newChunkIds[newChunk.GetFileIdString()] = true } - for _, oldChunk := range oldEntry.Chunks { + oldDataChunks, oldManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), + oldEntry.Chunks, 0, math.MaxInt64) + if err != nil { + glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", + newEntry.Chunks, oldEntry.Chunks) + return + } + for _, oldChunk := range oldDataChunks { + if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { + toDelete = append(toDelete, oldChunk) + } + } + for _, oldChunk := range oldManifestChunks { if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { toDelete = append(toDelete, oldChunk) } } - f.DeleteChunks(toDelete) + f.DeleteChunksNotRecursive(toDelete) } diff --git a/weed/filer/filer_hardlink.go b/weed/filer/filer_hardlink.go new file mode 100644 index 000000000..7a91602fd --- /dev/null +++ b/weed/filer/filer_hardlink.go @@ -0,0 +1,16 @@ +package filer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + HARD_LINK_MARKER = '\x01' +) + +type HardLinkId []byte // 16 bytes + 1 marker byte + +func NewHardLinkId() HardLinkId { + bytes := append(util.RandomBytes(16), HARD_LINK_MARKER) + return bytes +} diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 7ab101102..4d26a695c 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "strings" "time" @@ -92,14 +93,14 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { startTime, stopTime = startTime.UTC(), stopTime.UTC() - targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId, // startTime.Second(), startTime.Nanosecond(), ) for { if err := f.appendToFile(targetFile, buf); err != nil { - glog.V(1).Infof("log write failed %s: %v", targetFile, err) + glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) time.Sleep(737 * time.Millisecond) } else { break @@ -107,49 +108,67 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { } } -func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { +func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, isDone bool, err error) { startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) + var stopDate, stopHourMinute string + if stopTsNs != 0 { + stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC() + stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day()) + stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute()) + } sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() - dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "", "") + dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "") if listDayErr != nil { - return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr) + return lastTsNs, isDone, fmt.Errorf("fail to list log by day: %v", listDayErr) } for _, dayEntry := range dayEntries { + if stopDate != "" { + if strings.Compare(dayEntry.Name(), stopDate) > 0 { + break + } + } // println("checking day", dayEntry.FullPath) - hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "") + hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "") if listHourMinuteErr != nil { - return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) + return lastTsNs, isDone, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } for _, hourMinuteEntry := range hourMinuteEntries { // println("checking hh-mm", hourMinuteEntry.FullPath) if dayEntry.Name() == startDate { - if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 { + hourMinute := util.FileNameBase(hourMinuteEntry.Name()) + if strings.Compare(hourMinute, startHourMinute) < 0 { continue } } + if dayEntry.Name() == stopDate { + hourMinute := util.FileNameBase(hourMinuteEntry.Name()) + if strings.Compare(hourMinute, stopHourMinute) > 0 { + break + } + } // println("processing", hourMinuteEntry.FullPath) chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks) - if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, stopTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { continue } - return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) + return lastTsNs, isDone, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) } chunkedFileReader.Close() } } - return lastTsNs, nil + return lastTsNs, isDone, nil } -func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { for { n, err := r.Read(sizeBuf) if err != nil { @@ -172,9 +191,12 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func if err = proto.Unmarshal(entryData, logEntry); err != nil { return lastTsNs, err } - if logEntry.TsNs <= ns { + if logEntry.TsNs <= startTsNs { continue } + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + return lastTsNs, err + } // println("each log: ", logEntry.TsNs) if err := eachLogEntryFn(logEntry); err != nil { return lastTsNs, err diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index d441bbbc9..25b99d0f7 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -33,6 +33,8 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { Gid: OS_GID, }, } + } else if err != nil { + return fmt.Errorf("find %s: %v", fullpath, err) } else { offset = int64(TotalSize(entry.Chunks)) } @@ -41,7 +43,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) // update the entry - err = f.CreateEntry(context.Background(), entry, false, false, nil) + err = f.CreateEntry(context.Background(), entry, false, false, nil, false) return err } @@ -66,7 +68,16 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi // upload data targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth) + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: "", + Cipher: f.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: assignResult.Auth, + } + uploadResult, err := operation.UploadData(data, uploadOption) if err != nil { return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) } diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index 34ac5321a..3b290deca 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -2,8 +2,6 @@ package filer import ( "bytes" - "math" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -24,12 +22,12 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { } } if f.DirBucketsPath == event.Directory { - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(event) { if message.NewEntry.IsDirectory { f.Store.OnBucketCreation(message.NewEntry.Name) } } - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(event) { if message.OldEntry.IsDirectory { f.Store.OnBucketDeletion(message.OldEntry.Name) } @@ -55,9 +53,9 @@ func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataR } } -func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { +func (f *Filer) readEntry(chunks []*filer_pb.FileChunk, size uint64) ([]byte, error) { var buf bytes.Buffer - err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64) + err := StreamContent(f.MasterClient, &buf, chunks, 0, int64(size)) if err != nil { return nil, err } @@ -66,7 +64,7 @@ func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) { fc := NewFilerConf() - err := fc.loadFromChunks(f, entry.Content, entry.Chunks) + err := fc.loadFromChunks(f, entry.Content, entry.Chunks, FileSize(entry)) if err != nil { glog.Errorf("read filer conf chunks: %v", err) return diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 2e0336da8..112df7984 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -23,15 +23,15 @@ func splitPattern(pattern string) (prefix string, restPattern string) { // For now, prefix and namePattern are mutually exclusive func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string) (entries []*Entry, hasMore bool, err error) { + if limit > math.MaxInt32-1 { + limit = math.MaxInt32 - 1 + } + _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool { entries = append(entries, entry) return true }) - if limit == math.MaxInt64 { - limit = math.MaxInt64 - 1 - } - hasMore = int64(len(entries)) >= limit+1 if hasMore { entries = entries[:limit] diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 38927d6fb..260945b33 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -4,8 +4,11 @@ import ( "context" "errors" "github.com/chrislusf/seaweedfs/weed/util" + "io" ) +const CountEntryChunksForGzip = 50 + var ( ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") @@ -45,3 +48,7 @@ type BucketAware interface { OnBucketDeletion(bucket string) CanDropWholeBucket() bool } + +type Debuggable interface { + Debug(writer io.Writer) +} diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go index 316c76a0c..ae2f5fee7 100644 --- a/weed/filer/filerstore_hardlink.go +++ b/weed/filer/filerstore_hardlink.go @@ -9,16 +9,20 @@ import ( ) func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error { - if len(entry.HardLinkId) == 0 { + + if entry.IsDirectory() { return nil } - // handle hard links - if err := fsw.setHardLink(ctx, entry); err != nil { - return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err) + + if len(entry.HardLinkId) > 0 { + // handle hard links + if err := fsw.setHardLink(ctx, entry); err != nil { + return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err) + } } // check what is existing entry - glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath) + // glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath) actualStore := fsw.getActualStore(entry.FullPath) existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath) if err != nil && err != filer_pb.ErrNotFound { @@ -46,6 +50,8 @@ func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) err return encodeErr } + glog.V(4).Infof("setHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter) + return fsw.KvPut(ctx, key, newBlob) } @@ -55,7 +61,6 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr } key := entry.HardLinkId - glog.V(4).Infof("maybeReadHardLink KvGet %v", key) value, err := fsw.KvGet(ctx, key) if err != nil { glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) @@ -67,6 +72,8 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr return err } + glog.V(4).Infof("maybeReadHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter) + return nil } diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go index 00bf82ed4..9e74dd41c 100644 --- a/weed/filer/filerstore_translate_path.go +++ b/weed/filer/filerstore_translate_path.go @@ -3,20 +3,21 @@ package filer import ( "context" "github.com/chrislusf/seaweedfs/weed/util" + "math" "strings" ) var ( - _ = FilerStore(&FilerStorePathTranlator{}) + _ = FilerStore(&FilerStorePathTranslator{}) ) -type FilerStorePathTranlator struct { +type FilerStorePathTranslator struct { actualStore FilerStore storeRoot string } -func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStorePathTranlator { - if innerStore, ok := store.(*FilerStorePathTranlator); ok { +func NewFilerStorePathTranslator(storeRoot string, store FilerStore) *FilerStorePathTranslator { + if innerStore, ok := store.(*FilerStorePathTranslator); ok { return innerStore } @@ -24,13 +25,13 @@ func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStoreP storeRoot += "/" } - return &FilerStorePathTranlator{ + return &FilerStorePathTranslator{ actualStore: store, storeRoot: storeRoot, } } -func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.FullPath) { +func (t *FilerStorePathTranslator) translatePath(fp util.FullPath) (newPath util.FullPath) { newPath = fp if t.storeRoot == "/" { return @@ -41,7 +42,7 @@ func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util. } return } -func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath util.FullPath) { +func (t *FilerStorePathTranslator) changeEntryPath(entry *Entry) (previousPath util.FullPath) { previousPath = entry.FullPath if t.storeRoot == "/" { return @@ -49,33 +50,33 @@ func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath ut entry.FullPath = t.translatePath(previousPath) return } -func (t *FilerStorePathTranlator) recoverEntryPath(entry *Entry, previousPath util.FullPath) { +func (t *FilerStorePathTranslator) recoverEntryPath(entry *Entry, previousPath util.FullPath) { entry.FullPath = previousPath } -func (t *FilerStorePathTranlator) GetName() string { +func (t *FilerStorePathTranslator) GetName() string { return t.actualStore.GetName() } -func (t *FilerStorePathTranlator) Initialize(configuration util.Configuration, prefix string) error { +func (t *FilerStorePathTranslator) Initialize(configuration util.Configuration, prefix string) error { return t.actualStore.Initialize(configuration, prefix) } -func (t *FilerStorePathTranlator) InsertEntry(ctx context.Context, entry *Entry) error { +func (t *FilerStorePathTranslator) InsertEntry(ctx context.Context, entry *Entry) error { previousPath := t.changeEntryPath(entry) defer t.recoverEntryPath(entry, previousPath) return t.actualStore.InsertEntry(ctx, entry) } -func (t *FilerStorePathTranlator) UpdateEntry(ctx context.Context, entry *Entry) error { +func (t *FilerStorePathTranslator) UpdateEntry(ctx context.Context, entry *Entry) error { previousPath := t.changeEntryPath(entry) defer t.recoverEntryPath(entry, previousPath) return t.actualStore.UpdateEntry(ctx, entry) } -func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { +func (t *FilerStorePathTranslator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { if t.storeRoot == "/" { return t.actualStore.FindEntry(ctx, fp) } @@ -87,12 +88,12 @@ func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPat return } -func (t *FilerStorePathTranlator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { +func (t *FilerStorePathTranslator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { newFullPath := t.translatePath(fp) return t.actualStore.DeleteEntry(ctx, newFullPath) } -func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { +func (t *FilerStorePathTranslator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { previousPath := t.changeEntryPath(existingEntry) defer t.recoverEntryPath(existingEntry, previousPath) @@ -100,13 +101,13 @@ func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEn return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath) } -func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { +func (t *FilerStorePathTranslator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { newFullPath := t.translatePath(fp) return t.actualStore.DeleteFolderChildren(ctx, newFullPath) } -func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { +func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { newFullPath := t.translatePath(dirPath) @@ -116,38 +117,42 @@ func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirP }) } -func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) { +func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) { newFullPath := t.translatePath(dirPath) + if limit > math.MaxInt32-1 { + limit = math.MaxInt32 - 1 + } + return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool { entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath return eachEntryFunc(entry) }) } -func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) { +func (t *FilerStorePathTranslator) BeginTransaction(ctx context.Context) (context.Context, error) { return t.actualStore.BeginTransaction(ctx) } -func (t *FilerStorePathTranlator) CommitTransaction(ctx context.Context) error { +func (t *FilerStorePathTranslator) CommitTransaction(ctx context.Context) error { return t.actualStore.CommitTransaction(ctx) } -func (t *FilerStorePathTranlator) RollbackTransaction(ctx context.Context) error { +func (t *FilerStorePathTranslator) RollbackTransaction(ctx context.Context) error { return t.actualStore.RollbackTransaction(ctx) } -func (t *FilerStorePathTranlator) Shutdown() { +func (t *FilerStorePathTranslator) Shutdown() { t.actualStore.Shutdown() } -func (t *FilerStorePathTranlator) KvPut(ctx context.Context, key []byte, value []byte) (err error) { +func (t *FilerStorePathTranslator) KvPut(ctx context.Context, key []byte, value []byte) (err error) { return t.actualStore.KvPut(ctx, key, value) } -func (t *FilerStorePathTranlator) KvGet(ctx context.Context, key []byte) (value []byte, err error) { +func (t *FilerStorePathTranslator) KvGet(ctx context.Context, key []byte) (value []byte, err error) { return t.actualStore.KvGet(ctx, key) } -func (t *FilerStorePathTranlator) KvDelete(ctx context.Context, key []byte) (err error) { +func (t *FilerStorePathTranslator) KvDelete(ctx context.Context, key []byte) (err error) { return t.actualStore.KvDelete(ctx, key) } diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 2470f340c..3ece25ce6 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -4,6 +4,8 @@ import ( "context" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/viant/ptrie" + "io" + "math" "strings" "time" @@ -14,12 +16,14 @@ import ( var ( _ = VirtualFilerStore(&FilerStoreWrapper{}) + _ = Debuggable(&FilerStoreWrapper{}) ) type VirtualFilerStore interface { FilerStore DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteOneEntry(ctx context.Context, entry *Entry) error + DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) error AddPathSpecificStore(path string, storeId string, store FilerStore) OnBucketCreation(bucket string) OnBucketDeletion(bucket string) @@ -72,7 +76,7 @@ func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) { } func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) { - fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store) + fsw.storeIdToStore[storeId] = NewFilerStorePathTranslator(path, store) err := fsw.pathToStore.Put([]byte(path), storeId) if err != nil { glog.Fatalf("put path specific store: %v", err) @@ -124,7 +128,7 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err return err } - glog.V(4).Infof("InsertEntry %s", entry.FullPath) + // glog.V(4).Infof("InsertEntry %s", entry.FullPath) return actualStore.InsertEntry(ctx, entry) } @@ -145,7 +149,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err return err } - glog.V(4).Infof("UpdateEntry %s", entry.FullPath) + // glog.V(4).Infof("UpdateEntry %s", entry.FullPath) return actualStore.UpdateEntry(ctx, entry) } @@ -189,7 +193,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) } } - glog.V(4).Infof("DeleteEntry %s", fp) + // glog.V(4).Infof("DeleteEntry %s", fp) return actualStore.DeleteEntry(ctx, fp) } @@ -209,10 +213,22 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry } } - glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) + // glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) return actualStore.DeleteEntry(ctx, existingEntry.FullPath) } +func (fsw *FilerStoreWrapper) DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) (err error) { + actualStore := fsw.getActualStore(fullpath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("DeleteOneEntrySkipHardlink %s", fullpath) + return actualStore.DeleteEntry(ctx, fullpath) +} + func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { actualStore := fsw.getActualStore(fp + "/") stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() @@ -221,7 +237,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) }() - glog.V(4).Infof("DeleteFolderChildren %s", fp) + // glog.V(4).Infof("DeleteFolderChildren %s", fp) return actualStore.DeleteFolderChildren(ctx, fp) } @@ -233,7 +249,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) }() - glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) + // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) @@ -248,14 +264,18 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, defer func() { stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) }() - glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) - lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc) + if limit > math.MaxInt32-1 { + limit = math.MaxInt32 - 1 + } + // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) + adjustedEntryFunc := func(entry *Entry) bool { + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) + return eachEntryFunc(entry) + } + lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc) if err == ErrUnsupportedListDirectoryPrefixed { - lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool { - fsw.maybeReadHardLink(ctx, entry) - filer_pb.AfterEntryDeserialization(entry.Chunks) - return eachEntryFunc(entry) - }) + lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc) } return lastFileName, err } @@ -278,8 +298,10 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u count := int64(0) for count < limit && len(notPrefixed) > 0 { + var isLastItemHasPrefix bool for _, entry := range notPrefixed { if strings.HasPrefix(entry.Name(), prefix) { + isLastItemHasPrefix = true count++ if !eachEntryFunc(entry) { return @@ -287,17 +309,21 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u if count >= limit { break } + } else { + isLastItemHasPrefix = false } } - if count < limit { + if count < limit && isLastItemHasPrefix && len(notPrefixed) == int(limit) { notPrefixed = notPrefixed[:0] - _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { + lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { notPrefixed = append(notPrefixed, entry) return true }) if err != nil { return } + } else { + break } } return @@ -328,3 +354,9 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { return fsw.getDefaultStore().KvDelete(ctx, key) } + +func (fsw *FilerStoreWrapper) Debug(writer io.Writer) { + if debuggable, ok := fsw.getDefaultStore().(Debuggable); ok { + debuggable.Debug(writer) + } +} diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index e0d878ca7..c5d6eb48c 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -75,7 +75,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index ce454f36a..6abb37f99 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/syndtr/goleveldb/leveldb" leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + "io" "os" "github.com/chrislusf/seaweedfs/weed/filer" @@ -20,6 +22,10 @@ const ( DIR_FILE_SEPARATOR = byte(0x00) ) +var ( + _ = filer.Debuggable(&LevelDBStore{}) +) + func init() { filer.Stores = append(filer.Stores, &LevelDBStore{}) } @@ -45,9 +51,9 @@ func (store *LevelDBStore) initialize(dir string) (err error) { } opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 10, + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + Filter: filter.NewBloomFilter(8), // false positive rate 0.02 } if store.db, err = leveldb.OpenFile(dir, opts); err != nil { @@ -80,7 +86,7 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } @@ -241,3 +247,13 @@ func getNameFromKey(key []byte) string { func (store *LevelDBStore) Shutdown() { store.db.Close() } + +func (store *LevelDBStore) Debug(writer io.Writer) { + iter := store.db.NewIterator(&leveldb_util.Range{}, nil) + for iter.Next() { + key := iter.Key() + fullName := bytes.Replace(key, []byte{DIR_FILE_SEPARATOR}, []byte{' '}, 1) + fmt.Fprintf(writer, "%v\n", string(fullName)) + } + iter.Release() +} diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go index d437895f5..4cd8b88e8 100644 --- a/weed/filer/leveldb/leveldb_store_test.go +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -3,7 +3,6 @@ package leveldb import ( "context" "fmt" - "io/ioutil" "os" "testing" "time" @@ -13,9 +12,8 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -33,7 +31,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -67,9 +65,8 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -90,9 +87,8 @@ func TestEmptyRoot(t *testing.T) { } func BenchmarkInsertEntry(b *testing.B) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench") - defer os.RemoveAll(dir) + testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + dir := b.TempDir() store := &LevelDBStore{} store.initialize(dir) testFiler.SetStore(store) diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 4c4409c4d..d68493bd7 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -46,10 +46,9 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { } opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - Filter: filter.NewBloomFilter(8), // false positive rate 0.02 + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + Filter: filter.NewBloomFilter(8), // false positive rate 0.02 } for d := 0; d < dbCount; d++ { @@ -89,7 +88,7 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go index fd0ad18a3..1f8e33116 100644 --- a/weed/filer/leveldb2/leveldb2_store_test.go +++ b/weed/filer/leveldb2/leveldb2_store_test.go @@ -2,8 +2,6 @@ package leveldb import ( "context" - "io/ioutil" - "os" "testing" "github.com/chrislusf/seaweedfs/weed/filer" @@ -11,9 +9,8 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) testFiler.SetStore(store) @@ -31,7 +28,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -65,9 +62,8 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) testFiler.SetStore(store) diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index bc57a6605..d21515bd4 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -66,17 +66,15 @@ func (store *LevelDB3Store) initialize(dir string) (err error) { func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) { bloom := filter.NewBloomFilter(8) // false positive rate 0.02 opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - Filter: bloom, + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + Filter: bloom, } if name != DEFAULT { opts = &opt.Options{ - BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - Filter: bloom, + BlockCacheCapacity: 16 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 8 * 1024 * 1024, // default value is 4MiB + Filter: bloom, } } @@ -179,7 +177,7 @@ func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go index 0b970a539..823c3a1bf 100644 --- a/weed/filer/leveldb3/leveldb3_store_test.go +++ b/weed/filer/leveldb3/leveldb3_store_test.go @@ -2,8 +2,6 @@ package leveldb import ( "context" - "io/ioutil" - "os" "testing" "github.com/chrislusf/seaweedfs/weed/filer" @@ -11,9 +9,8 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) testFiler.SetStore(store) @@ -31,7 +28,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -65,9 +62,8 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) testFiler.SetStore(store) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 913cbd454..fb96ee01b 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -3,6 +3,8 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" "sync" @@ -18,9 +20,13 @@ import ( ) type MetaAggregator struct { - filers []string - grpcDialOption grpc.DialOption - MetaLogBuffer *log_buffer.LogBuffer + filer *Filer + self pb.ServerAddress + isLeader bool + grpcDialOption grpc.DialOption + MetaLogBuffer *log_buffer.LogBuffer + peerStatues map[pb.ServerAddress]int + peerStatuesLock sync.Mutex // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond @@ -28,10 +34,12 @@ type MetaAggregator struct { // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. // The old data comes from what each LocalMetadata persisted on disk. -func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator { +func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator { t := &MetaAggregator{ - filers: filers, + filer: filer, + self: self, grpcDialOption: grpcDialOption, + peerStatues: make(map[pb.ServerAddress]int), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -40,13 +48,66 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg return t } -func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { - for _, filer := range ma.filers { - go ma.subscribeToOneFiler(f, self, filer) +func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.FilerType { + return } + + address := pb.ServerAddress(update.Address) + if update.IsAdd { + // every filer should subscribe to a new filer + if ma.setActive(address, true) { + go ma.loopSubscribeToOnefiler(ma.filer, ma.self, address, startFrom) + } + } else { + ma.setActive(address, false) + } +} + +func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { + ma.peerStatuesLock.Lock() + defer ma.peerStatuesLock.Unlock() + if isActive { + if _, found := ma.peerStatues[address]; found { + ma.peerStatues[address] += 1 + } else { + ma.peerStatues[address] = 1 + notDuplicated = true + } + } else { + if _, found := ma.peerStatues[address]; found { + delete(ma.peerStatues, address) + } + } + return +} +func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { + ma.peerStatuesLock.Lock() + defer ma.peerStatuesLock.Unlock() + var count int + count, isActive = ma.peerStatues[address] + return count > 0 && isActive } -func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) { +func (ma *MetaAggregator) loopSubscribeToOnefiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) { + lastTsNs := startFrom.UnixNano() + for { + glog.V(0).Infof("loopSubscribeToOnefiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs) + nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs) + if !ma.isActive(peer) { + glog.V(0).Infof("stop subscribing remote %s meta change", peer) + return + } + if err != nil { + glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) + } else if lastTsNs < nextLastTsNs { + lastTsNs = nextLastTsNs + } + time.Sleep(1733 * time.Millisecond) + } +} + +func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom int64) (int64, error) { /* Each filer reads the "filer.store.id", which is the store's signature when filer starts. @@ -60,20 +121,26 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) lastPersistTime := time.Now() - lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() + lastTsNs := startFrom peerSignature, err := ma.readFilerStoreSignature(peer) - for err != nil { - glog.V(0).Infof("connecting to peer filer %s: %v", peer, err) - time.Sleep(1357 * time.Millisecond) - peerSignature, err = ma.readFilerStoreSignature(peer) + if err != nil { + return lastTsNs, fmt.Errorf("connecting to peer filer %s: %v", peer, err) } // when filer store is not shared by multiple filers if peerSignature != f.Signature { - lastTsNs = 0 if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil { lastTsNs = prevTsNs + defer func(prevTsNs int64) { + if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() { + if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil { + glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) + } else { + glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) + } + } + }(prevTsNs) } glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) @@ -110,54 +177,50 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string } dir := event.Directory // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, 0) + ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) if maybeReplicateMetadataChange != nil { maybeReplicateMetadataChange(event) } return nil } - for { - glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs)) - err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer:" + self, - PathPrefix: "/", - SinceNs: lastTsNs, - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } + glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs)) + err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "filer:" + string(self), + PathPrefix: "/", + SinceNs: lastTsNs, + ClientId: int32(ma.filer.UniqueFileId), + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } - if err := processEventFn(resp); err != nil { - return fmt.Errorf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs + if err := processEventFn(resp); err != nil { + return fmt.Errorf("process %v: %v", resp, err) + } + lastTsNs = resp.TsNs - f.onMetadataChangeEvent(resp) + f.onMetadataChangeEvent(resp) - } - }) - if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) - time.Sleep(1733 * time.Millisecond) } - } + }) + return lastTsNs, err } -func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) { - err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) { + err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err @@ -172,18 +235,13 @@ const ( MetaOffsetPrefix = "Meta" ) -func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) { +func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) { key := []byte(MetaOffsetPrefix + "xxxx") util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) value, err := f.Store.KvGet(context.Background(), key) - if err == ErrKvNotFound { - glog.Warningf("readOffset %s not found", peer) - return 0, nil - } - if err != nil { return 0, fmt.Errorf("readOffset %s : %v", peer, err) } @@ -195,7 +253,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) return } -func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) { +func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) { key := []byte(MetaOffsetPrefix + "xxxx") util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 1ef5056f4..83686bfe7 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -107,7 +107,7 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } @@ -159,7 +159,7 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa dir, name := fullpath.DirAndName() where := bson.M{"directory": dir, "name": name} - _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where) + _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) } @@ -193,11 +193,15 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti optLimit := int64(limit) opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}} cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts) + if err != nil { + return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) + } + for cur.Next(ctx) { var data Model - err := cur.Decode(&data) - if err != nil && err != mongo.ErrNoDocuments { - return lastFileName, err + err = cur.Decode(&data) + if err != nil { + break } entry := &filer.Entry{ diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go index a1f54455a..e50480150 100644 --- a/weed/filer/mysql2/mysql2_store.go +++ b/weed/filer/mysql2/mysql2_store.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -82,7 +83,7 @@ func (store *MysqlStore2) initialize(createTable, upsertQuery string, enableUpse return fmt.Errorf("connect to %s error:%v", sqlUrl, err) } - if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil { + if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil && !strings.Contains(err.Error(), "table already exist") { return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err) } diff --git a/weed/filer/permission.go b/weed/filer/permission.go deleted file mode 100644 index 0d8b8292b..000000000 --- a/weed/filer/permission.go +++ /dev/null @@ -1,22 +0,0 @@ -package filer - -func hasWritePermission(dir *Entry, entry *Entry) bool { - - if dir == nil { - return false - } - - if dir.Uid == entry.Uid && dir.Mode&0200 > 0 { - return true - } - - if dir.Gid == entry.Gid && dir.Mode&0020 > 0 { - return true - } - - if dir.Mode&0002 > 0 { - return true - } - - return false -} diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go index 3406246a9..6372dac72 100644 --- a/weed/filer/read_remote.go +++ b/weed/filer/read_remote.go @@ -21,13 +21,13 @@ func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMou return remoteLocation } -func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, remoteLocationPath string)(fp util.FullPath) { +func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, remoteLocationPath string) (fp util.FullPath) { return localMountedDir.Child(remoteLocationPath[len(remoteMountedLocation.Path):]) } -func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - _, err := client.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{ +func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: string(parent), Name: entry.Name, }) diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 14e8cab1e..e34034eb6 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -4,7 +4,6 @@ import ( "bytes" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/wdclient" - "math" "time" ) @@ -23,7 +22,7 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed return err } - return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, int64(FileSize(respLookupEntry.Entry))) } @@ -54,15 +53,14 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte Name: name, IsDirectory: false, Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(0644), - Collection: "", - Replication: "", - FileSize: uint64(len(content)), + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0644), + FileSize: uint64(len(content)), }, Content: content, }, + SkipCheckParentDirectory: true, }) } else if err == nil { entry := resp.Entry diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 458cf88be..7d9997761 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -12,20 +12,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/golang/groupcache/singleflight" ) type ChunkReadAt struct { - masterClient *wdclient.MasterClient - chunkViews []*ChunkView - lookupFileId wdclient.LookupFileIdFunctionType - readerLock sync.Mutex - fileSize int64 - - fetchGroup singleflight.Group - chunkCache chunk_cache.ChunkCache - lastChunkFileId string - lastChunkData []byte + masterClient *wdclient.MasterClient + chunkViews []*ChunkView + readerLock sync.Mutex + fileSize int64 + readerCache *ReaderCache + readerPattern *ReaderPattern + lastChunkFid string } var _ = io.ReaderAt(&ChunkReadAt{}) @@ -43,7 +39,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp if !found { util.Retry("lookup volume "+vid, func() error { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) @@ -88,21 +84,22 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ - chunkViews: chunkViews, - lookupFileId: lookupFn, - chunkCache: chunkCache, - fileSize: fileSize, + chunkViews: chunkViews, + fileSize: fileSize, + readerCache: newReaderCache(32, chunkCache, lookupFn), + readerPattern: NewReaderPattern(), } } func (c *ChunkReadAt) Close() error { - c.lastChunkData = nil - c.lastChunkFileId = "" + c.readerCache.destroy() return nil } func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { + c.readerPattern.MonitorReadAt(offset, len(p)) + c.readerLock.Lock() defer c.readerLock.Unlock() @@ -113,19 +110,17 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { startOffset, remaining := offset, int64(len(p)) - var nextChunk *ChunkView + var nextChunks []*ChunkView for i, chunk := range c.chunkViews { if remaining <= 0 { break } if i+1 < len(c.chunkViews) { - nextChunk = c.chunkViews[i+1] - } else { - nextChunk = nil + nextChunks = c.chunkViews[i+1:] } if startOffset < chunk.LogicOffset { gap := int(chunk.LogicOffset - startOffset) - glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) + glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset) n += int(min(int64(gap), remaining)) startOffset, remaining = chunk.LogicOffset, remaining-int64(gap) if remaining <= 0 { @@ -138,16 +133,13 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { continue } // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - var buffer []byte bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset - bufferLength := chunkStop - chunkStart - buffer, err = c.readChunkSlice(chunk, nextChunk, uint64(bufferOffset), uint64(bufferLength)) + copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return + return copied, err } - copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer) n += copied startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) } @@ -169,77 +161,25 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) { - - chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) - if len(chunkSlice) > 0 { - return chunkSlice, nil - } - chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews) - if err != nil { - return nil, err - } - wanted := min(int64(length), int64(len(chunkData))-int64(offset)) - return chunkData[offset : int64(offset)+wanted], nil -} - -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) { - - if c.lastChunkFileId == chunkView.FileId { - return c.lastChunkData, nil - } - - v, doErr := c.readOneWholeChunk(chunkView) +func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) { - if doErr != nil { - return nil, doErr + if c.readerPattern.IsRandomMode() { + return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) } - chunkData = v.([]byte) - - c.lastChunkData = chunkData - c.lastChunkFileId = chunkView.FileId - - for _, nextChunkView := range nextChunkViews { - if c.chunkCache != nil && nextChunkView != nil { - go c.readOneWholeChunk(nextChunkView) + n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0) + if c.lastChunkFid != chunkView.FileId { + if chunkView.Offset == 0 { // start of a new chunk + if c.lastChunkFid != "" { + c.readerCache.UnCache(c.lastChunkFid) + c.readerCache.MaybeCache(nextChunkViews) + } else { + if len(nextChunkViews) >= 1 { + c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning + } + } } } - + c.lastChunkFid = chunkView.FileId return } - -func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) { - - var err error - - return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) { - - glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) - - data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) - if data != nil { - glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) - } else { - var err error - data, err = c.doFetchFullChunkData(chunkView) - if err != nil { - return data, err - } - c.chunkCache.SetChunk(chunkView.FileId, data) - } - return data, err - }) -} - -func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) { - - glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) - - data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) - - glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) - - return data, err - -} diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go index f8e4727ce..d9afb460c 100644 --- a/weed/filer/reader_at_test.go +++ b/weed/filer/reader_at_test.go @@ -21,8 +21,12 @@ func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { return data } -func (m *mockChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte { - return nil +func (m *mockChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) { + x, _ := strconv.Atoi(fileId) + for i := 0; i < len(data); i++ { + data[i] = byte(x) + } + return len(data), nil } func (m *mockChunkCache) SetChunk(fileId string, data []byte) { @@ -64,11 +68,11 @@ func TestReaderAt(t *testing.T) { } readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 10, - chunkCache: &mockChunkCache{}, + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + readerLock: sync.Mutex{}, + fileSize: 10, + readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerPattern: NewReaderPattern(), } testReadAt(t, readerAt, 0, 10, 10, io.EOF) @@ -80,7 +84,7 @@ func TestReaderAt(t *testing.T) { func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) { data := make([]byte, size) - n, err := readerAt.ReadAt(data, offset) + n, err := readerAt.doReadAt(data, offset) for _, d := range data { fmt.Printf("%x", d) @@ -114,11 +118,11 @@ func TestReaderAt0(t *testing.T) { } readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 10, - chunkCache: &mockChunkCache{}, + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + readerLock: sync.Mutex{}, + fileSize: 10, + readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerPattern: NewReaderPattern(), } testReadAt(t, readerAt, 0, 10, 10, io.EOF) @@ -142,11 +146,11 @@ func TestReaderAt1(t *testing.T) { } readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 20, - chunkCache: &mockChunkCache{}, + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + readerLock: sync.Mutex{}, + fileSize: 20, + readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerPattern: NewReaderPattern(), } testReadAt(t, readerAt, 0, 20, 20, io.EOF) diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go new file mode 100644 index 000000000..bce97cc49 --- /dev/null +++ b/weed/filer/reader_cache.go @@ -0,0 +1,192 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "sync" + "time" +) + +type ReaderCache struct { + chunkCache chunk_cache.ChunkCache + lookupFileIdFn wdclient.LookupFileIdFunctionType + sync.Mutex + downloaders map[string]*SingleChunkCacher + limit int +} + +type SingleChunkCacher struct { + sync.RWMutex + parent *ReaderCache + chunkFileId string + data []byte + err error + cipherKey []byte + isGzipped bool + chunkSize int + shouldCache bool + wg sync.WaitGroup + completedTime time.Time +} + +func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { + return &ReaderCache{ + limit: limit, + chunkCache: chunkCache, + lookupFileIdFn: lookupFileIdFn, + downloaders: make(map[string]*SingleChunkCacher), + } +} + +func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { + if rc.lookupFileIdFn == nil { + return + } + + rc.Lock() + defer rc.Unlock() + + for _, chunkView := range chunkViews { + if _, found := rc.downloaders[chunkView.FileId]; found { + continue + } + + if len(rc.downloaders) >= rc.limit { + // if still no slots, return + return + } + + // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset) + // cache this chunk if not yet + cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false) + cacher.wg.Add(1) + go cacher.startCaching() + cacher.wg.Wait() + rc.downloaders[chunkView.FileId] = cacher + + } + + return +} + +func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { + rc.Lock() + defer rc.Unlock() + if cacher, found := rc.downloaders[fileId]; found { + return cacher.readChunkAt(buffer, offset) + } + if shouldCache || rc.lookupFileIdFn == nil { + n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) + if n > 0 { + return n, err + } + } + + if len(rc.downloaders) >= rc.limit { + oldestFid, oldestTime := "", time.Now() + for fid, downloader := range rc.downloaders { + if !downloader.completedTime.IsZero() { + if downloader.completedTime.Before(oldestTime) { + oldestFid, oldestTime = fid, downloader.completedTime + } + } + } + if oldestFid != "" { + oldDownloader := rc.downloaders[oldestFid] + delete(rc.downloaders, oldestFid) + oldDownloader.destroy() + } + } + + // glog.V(4).Infof("cache1 %s", fileId) + + cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache) + cacher.wg.Add(1) + go cacher.startCaching() + cacher.wg.Wait() + rc.downloaders[fileId] = cacher + + return cacher.readChunkAt(buffer, offset) +} + +func (rc *ReaderCache) UnCache(fileId string) { + rc.Lock() + defer rc.Unlock() + // glog.V(4).Infof("uncache %s", fileId) + if downloader, found := rc.downloaders[fileId]; found { + downloader.destroy() + delete(rc.downloaders, fileId) + } +} + +func (rc *ReaderCache) destroy() { + rc.Lock() + defer rc.Unlock() + + for _, downloader := range rc.downloaders { + downloader.destroy() + } + +} + +func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher { + t := &SingleChunkCacher{ + parent: parent, + chunkFileId: fileId, + cipherKey: cipherKey, + isGzipped: isGzipped, + chunkSize: chunkSize, + shouldCache: shouldCache, + } + return t +} + +func (s *SingleChunkCacher) startCaching() { + s.Lock() + defer s.Unlock() + + s.wg.Done() // means this has been started + + urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId) + if err != nil { + s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) + return + } + + s.data = mem.Allocate(s.chunkSize) + + _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) + if s.err != nil { + mem.Free(s.data) + s.data = nil + return + } + + s.completedTime = time.Now() + if s.shouldCache { + s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + } + + return +} + +func (s *SingleChunkCacher) destroy() { + if s.data != nil { + mem.Free(s.data) + s.data = nil + } +} + +func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { + s.RLock() + defer s.RUnlock() + + if s.err != nil { + return 0, s.err + } + + return copy(buf, s.data[offset:]), nil + +} diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go new file mode 100644 index 000000000..b860bc577 --- /dev/null +++ b/weed/filer/reader_pattern.go @@ -0,0 +1,38 @@ +package filer + +type ReaderPattern struct { + isStreaming bool + lastReadOffset int64 +} + +// For streaming read: only cache the first chunk +// For random read: only fetch the requested range, instead of the whole chunk + +func NewReaderPattern() *ReaderPattern { + return &ReaderPattern{ + isStreaming: true, + lastReadOffset: -1, + } +} + +func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { + isStreaming := true + if rp.lastReadOffset > offset { + isStreaming = false + } + if rp.lastReadOffset == -1 { + if offset != 0 { + isStreaming = false + } + } + rp.lastReadOffset = offset + rp.isStreaming = isStreaming +} + +func (rp *ReaderPattern) IsStreamingMode() bool { + return rp.isStreaming +} + +func (rp *ReaderPattern) IsRandomMode() bool { + return !rp.isStreaming +} diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 30d11a7f4..89684647b 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -3,7 +3,7 @@ package redis import ( "context" "fmt" - "sort" + "golang.org/x/exp/slices" "strings" "time" @@ -40,7 +40,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } @@ -120,6 +120,8 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full if err != nil { return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) } + // not efficient, but need to remove if it is a directory + store.Client.Del(ctx, genDirectoryListKey(string(path))) } return nil @@ -155,8 +157,8 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP } // sort - sort.Slice(members, func(i, j int) bool { - return strings.Compare(members[i], members[j]) < 0 + slices.SortFunc(members, func(a, b string) bool { + return strings.Compare(a, b) < 0 }) // limit diff --git a/weed/filer/redis2/redis_sentinel_store.go b/weed/filer/redis2/redis_sentinel_store.go new file mode 100644 index 000000000..802588b2b --- /dev/null +++ b/weed/filer/redis2/redis_sentinel_store.go @@ -0,0 +1,45 @@ +package redis2 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" + "time" +) + +func init() { + filer.Stores = append(filer.Stores, &Redis2SentinelStore{}) +} + +type Redis2SentinelStore struct { + UniversalRedis2Store +} + +func (store *Redis2SentinelStore) GetName() string { + return "redis2_sentinel" +} + +func (store *Redis2SentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"masterName"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis2SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) { + store.Client = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: masterName, + SentinelAddrs: addresses, + Username: username, + Password: password, + DB: database, + MinRetryBackoff: time.Millisecond * 100, + MaxRetryBackoff: time.Minute * 1, + ReadTimeout: time.Second * 30, + WriteTimeout: time.Second * 5, + }) + return +} diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index aab3d1f4a..7a34092a0 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -52,7 +52,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } @@ -133,7 +133,10 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful return nil } - members, err := store.Client.ZRange(ctx, genDirectoryListKey(string(fullpath)), 0, -1).Result() + members, err := store.Client.ZRangeByLex(ctx, genDirectoryListKey(string(fullpath)), &redis.ZRangeBy{ + Min: "-", + Max: "+", + }).Result() if err != nil { return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) } @@ -144,6 +147,8 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful if err != nil { return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) } + // not efficient, but need to remove if it is a directory + store.Client.Del(ctx, genDirectoryListKey(string(path))) } return nil @@ -156,14 +161,22 @@ func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Cont func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { dirListKey := genDirectoryListKey(string(dirPath)) - start := int64(0) + + min := "-" if startFileName != "" { - start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result() - if !includeStartFile { - start++ + if includeStartFile { + min = "[" + startFileName + } else { + min = "(" + startFileName } } - members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result() + + members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{ + Min: min, + Max: "+", + Offset: 0, + Count: limit, + }).Result() if err != nil { return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } diff --git a/weed/filer/redis3/ItemList.go b/weed/filer/redis3/ItemList.go new file mode 100644 index 000000000..af3b8ae5a --- /dev/null +++ b/weed/filer/redis3/ItemList.go @@ -0,0 +1,507 @@ +package redis3 + +import ( + "bytes" + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/util/skiplist" + "github.com/go-redis/redis/v8" +) + +type ItemList struct { + skipList *skiplist.SkipList + batchSize int + client redis.UniversalClient + prefix string +} + +func newItemList(client redis.UniversalClient, prefix string, store skiplist.ListStore, batchSize int) *ItemList { + return &ItemList{ + skipList: skiplist.New(store), + batchSize: batchSize, + client: client, + prefix: prefix, + } +} + +/* +Be reluctant to create new nodes. Try to fit into either previous node or next node. +Prefer to add to previous node. + +There are multiple cases after finding the name for greater or equal node + 1. found and node.Key == name + The node contains a batch with leading key the same as the name + nothing to do + 2. no such node found or node.Key > name + + if no such node found + prevNode = list.LargestNode + + // case 2.1 + if previousNode contains name + nothing to do + + // prefer to add to previous node + if prevNode != nil { + // case 2.2 + if prevNode has capacity + prevNode.add name, and save + return + // case 2.3 + split prevNode by name + } + + // case 2.4 + // merge into next node. Avoid too many nodes if adding data in reverse order. + if nextNode is not nil and nextNode has capacity + delete nextNode.Key + nextNode.Key = name + nextNode.batch.add name + insert nodeNode.Key + return + + // case 2.5 + if prevNode is nil + insert new node with key = name, value = batch{name} + return + +*/ + +func (nl *ItemList) canAddMember(node *skiplist.SkipListElementReference, name string) (alreadyContains bool, nodeSize int, err error) { + ctx := context.Background() + pipe := nl.client.TxPipeline() + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + countOperation := pipe.ZLexCount(ctx, key, "-", "+") + scoreOperationt := pipe.ZScore(ctx, key, name) + if _, err = pipe.Exec(ctx); err != nil && err != redis.Nil { + return false, 0, err + } + if err == redis.Nil { + err = nil + } + alreadyContains = scoreOperationt.Err() == nil + nodeSize = int(countOperation.Val()) + return +} + +func (nl *ItemList) WriteName(name string) error { + + lookupKey := []byte(name) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + // case 1: the name already exists as one leading key in the batch + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + return nil + } + + var prevNodeReference *skiplist.SkipListElementReference + if !found { + prevNodeReference = nl.skipList.GetLargestNodeReference() + } + + if nextNode != nil && prevNode == nil { + prevNodeReference = nextNode.Prev + } + + if prevNodeReference != nil { + alreadyContains, nodeSize, err := nl.canAddMember(prevNodeReference, name) + if err != nil { + return err + } + if alreadyContains { + // case 2.1 + return nil + } + + // case 2.2 + if nodeSize < nl.batchSize { + return nl.NodeAddMember(prevNodeReference, name) + } + + // case 2.3 + x := nl.NodeInnerPosition(prevNodeReference, name) + y := nodeSize - x + addToX := x <= y + // add to a new node + if x == 0 || y == 0 { + if err := nl.ItemAdd(lookupKey, 0, name); err != nil { + return err + } + return nil + } + if addToX { + // collect names before name, add them to X + namesToX, err := nl.NodeRangeBeforeExclusive(prevNodeReference, name) + if err != nil { + return nil + } + // delete skiplist reference to old node + if _, err := nl.skipList.DeleteByKey(prevNodeReference.Key); err != nil { + return err + } + // add namesToY and name to a new X + namesToX = append(namesToX, name) + if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil { + return nil + } + // remove names less than name from current Y + if err := nl.NodeDeleteBeforeExclusive(prevNodeReference, name); err != nil { + return nil + } + + // point skip list to current Y + if err := nl.ItemAdd(lookupKey, prevNodeReference.ElementPointer); err != nil { + return nil + } + return nil + } else { + // collect names after name, add them to Y + namesToY, err := nl.NodeRangeAfterExclusive(prevNodeReference, name) + if err != nil { + return nil + } + // add namesToY and name to a new Y + namesToY = append(namesToY, name) + if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil { + return nil + } + // remove names after name from current X + if err := nl.NodeDeleteAfterExclusive(prevNodeReference, name); err != nil { + return nil + } + return nil + } + + } + + // case 2.4 + if nextNode != nil { + nodeSize := nl.NodeSize(nextNode.Reference()) + if nodeSize < nl.batchSize { + if id, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { + return err + } else { + if err := nl.ItemAdd(lookupKey, id, name); err != nil { + return err + } + } + return nil + } + } + + // case 2.5 + // now prevNode is nil + return nl.ItemAdd(lookupKey, 0, name) +} + +/* +// case 1: exists in nextNode +if nextNode != nil && nextNode.Key == name { + remove from nextNode, update nextNode + // TODO: merge with prevNode if possible? + return +} +if nextNode is nil + prevNode = list.Largestnode +if prevNode == nil and nextNode.Prev != nil + prevNode = load(nextNode.Prev) + +// case 2: does not exist +// case 2.1 +if prevNode == nil { + return +} +// case 2.2 +if prevNameBatch does not contain name { + return +} + +// case 3 +delete from prevNameBatch +if prevNameBatch + nextNode < capacityList + // case 3.1 + merge +else + // case 3.2 + update prevNode + + +*/ +func (nl *ItemList) DeleteName(name string) error { + lookupKey := []byte(name) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + + // case 1 + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { + return err + } + if err := nl.NodeDeleteMember(nextNode.Reference(), name); err != nil { + return err + } + minName := nl.NodeMin(nextNode.Reference()) + if minName == "" { + return nl.NodeDelete(nextNode.Reference()) + } + return nl.ItemAdd([]byte(minName), nextNode.Id) + } + + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if nextNode != nil && prevNode == nil { + prevNode, err = nl.skipList.LoadElement(nextNode.Prev) + if err != nil { + return err + } + } + + // case 2 + if prevNode == nil { + // case 2.1 + return nil + } + if !nl.NodeContainsItem(prevNode.Reference(), name) { + return nil + } + + // case 3 + if err := nl.NodeDeleteMember(prevNode.Reference(), name); err != nil { + return err + } + prevSize := nl.NodeSize(prevNode.Reference()) + if prevSize == 0 { + if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil { + return err + } + return nil + } + nextSize := nl.NodeSize(nextNode.Reference()) + if nextSize > 0 && prevSize+nextSize < nl.batchSize { + // case 3.1 merge nextNode and prevNode + if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { + return err + } + nextNames, err := nl.NodeRangeBeforeExclusive(nextNode.Reference(), "") + if err != nil { + return err + } + if err := nl.NodeAddMember(prevNode.Reference(), nextNames...); err != nil { + return err + } + return nl.NodeDelete(nextNode.Reference()) + } else { + // case 3.2 update prevNode + // no action to take + return nil + } + + return nil +} + +func (nl *ItemList) ListNames(startFrom string, visitNamesFn func(name string) bool) error { + lookupKey := []byte(startFrom) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + prevNode = nil + } + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if prevNode != nil { + if !nl.NodeScanIncluseiveAfter(prevNode.Reference(), startFrom, visitNamesFn) { + return nil + } + } + + for nextNode != nil { + if !nl.NodeScanIncluseiveAfter(nextNode.Reference(), startFrom, visitNamesFn) { + return nil + } + nextNode, err = nl.skipList.LoadElement(nextNode.Next[0]) + if err != nil { + return err + } + } + + return nil +} + +func (nl *ItemList) RemoteAllListElement() error { + + t := nl.skipList + + nodeRef := t.StartLevels[0] + for nodeRef != nil { + node, err := t.LoadElement(nodeRef) + if err != nil { + return err + } + if node == nil { + return nil + } + if err := t.DeleteElement(node); err != nil { + return err + } + if err := nl.NodeDelete(node.Reference()); err != nil { + return err + } + nodeRef = node.Next[0] + } + return nil + +} + +func (nl *ItemList) NodeContainsItem(node *skiplist.SkipListElementReference, item string) bool { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + _, err := nl.client.ZScore(context.Background(), key, item).Result() + if err == redis.Nil { + return false + } + if err == nil { + return true + } + return false +} + +func (nl *ItemList) NodeSize(node *skiplist.SkipListElementReference) int { + if node == nil { + return 0 + } + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + return int(nl.client.ZLexCount(context.Background(), key, "-", "+").Val()) +} + +func (nl *ItemList) NodeAddMember(node *skiplist.SkipListElementReference, names ...string) error { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + var members []*redis.Z + for _, name := range names { + members = append(members, &redis.Z{ + Score: 0, + Member: name, + }) + } + return nl.client.ZAddNX(context.Background(), key, members...).Err() +} +func (nl *ItemList) NodeDeleteMember(node *skiplist.SkipListElementReference, name string) error { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + return nl.client.ZRem(context.Background(), key, name).Err() +} + +func (nl *ItemList) NodeDelete(node *skiplist.SkipListElementReference) error { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + return nl.client.Del(context.Background(), key).Err() +} + +func (nl *ItemList) NodeInnerPosition(node *skiplist.SkipListElementReference, name string) int { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + return int(nl.client.ZLexCount(context.Background(), key, "-", "("+name).Val()) +} + +func (nl *ItemList) NodeMin(node *skiplist.SkipListElementReference) string { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + slice := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ + Min: "-", + Max: "+", + Offset: 0, + Count: 1, + }).Val() + if len(slice) > 0 { + s := slice[0] + return s + } + return "" +} + +func (nl *ItemList) NodeScanIncluseiveAfter(node *skiplist.SkipListElementReference, startFrom string, visitNamesFn func(name string) bool) bool { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + if startFrom == "" { + startFrom = "-" + } else { + startFrom = "[" + startFrom + } + names := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ + Min: startFrom, + Max: "+", + }).Val() + for _, n := range names { + if !visitNamesFn(n) { + return false + } + } + return true +} + +func (nl *ItemList) NodeRangeBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) ([]string, error) { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + if stopAt == "" { + stopAt = "+" + } else { + stopAt = "(" + stopAt + } + return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ + Min: "-", + Max: stopAt, + }).Result() +} +func (nl *ItemList) NodeRangeAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) ([]string, error) { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + if startFrom == "" { + startFrom = "-" + } else { + startFrom = "(" + startFrom + } + return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ + Min: startFrom, + Max: "+", + }).Result() +} + +func (nl *ItemList) NodeDeleteBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) error { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + if stopAt == "" { + stopAt = "+" + } else { + stopAt = "(" + stopAt + } + return nl.client.ZRemRangeByLex(context.Background(), key, "-", stopAt).Err() +} +func (nl *ItemList) NodeDeleteAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) error { + key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) + if startFrom == "" { + startFrom = "-" + } else { + startFrom = "(" + startFrom + } + return nl.client.ZRemRangeByLex(context.Background(), key, startFrom, "+").Err() +} + +func (nl *ItemList) ItemAdd(lookupKey []byte, idIfKnown int64, names ...string) error { + if id, err := nl.skipList.InsertByKey(lookupKey, idIfKnown, nil); err != nil { + return err + } else { + if len(names) > 0 { + return nl.NodeAddMember(&skiplist.SkipListElementReference{ + ElementPointer: id, + Key: lookupKey, + }, names...) + } + } + return nil +} diff --git a/weed/filer/redis3/item_list_serde.go b/weed/filer/redis3/item_list_serde.go new file mode 100644 index 000000000..d0310ce40 --- /dev/null +++ b/weed/filer/redis3/item_list_serde.go @@ -0,0 +1,75 @@ +package redis3 + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/skiplist" + "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" +) + +func LoadItemList(data []byte, prefix string, client redis.UniversalClient, store skiplist.ListStore, batchSize int) *ItemList { + + nl := &ItemList{ + skipList: skiplist.New(store), + batchSize: batchSize, + client: client, + prefix: prefix, + } + + if len(data) == 0 { + return nl + } + + message := &skiplist.SkipListProto{} + if err := proto.Unmarshal(data, message); err != nil { + glog.Errorf("loading skiplist: %v", err) + } + nl.skipList.MaxNewLevel = int(message.MaxNewLevel) + nl.skipList.MaxLevel = int(message.MaxLevel) + for i, ref := range message.StartLevels { + nl.skipList.StartLevels[i] = &skiplist.SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + } + } + for i, ref := range message.EndLevels { + nl.skipList.EndLevels[i] = &skiplist.SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + } + } + return nl +} + +func (nl *ItemList) HasChanges() bool { + return nl.skipList.HasChanges +} + +func (nl *ItemList) ToBytes() []byte { + message := &skiplist.SkipListProto{} + message.MaxNewLevel = int32(nl.skipList.MaxNewLevel) + message.MaxLevel = int32(nl.skipList.MaxLevel) + for _, ref := range nl.skipList.StartLevels { + if ref == nil { + break + } + message.StartLevels = append(message.StartLevels, &skiplist.SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + }) + } + for _, ref := range nl.skipList.EndLevels { + if ref == nil { + break + } + message.EndLevels = append(message.EndLevels, &skiplist.SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + }) + } + data, err := proto.Marshal(message) + if err != nil { + glog.Errorf("marshal skiplist: %v", err) + } + return data +} diff --git a/weed/filer/redis3/kv_directory_children.go b/weed/filer/redis3/kv_directory_children.go new file mode 100644 index 000000000..d92dddfe6 --- /dev/null +++ b/weed/filer/redis3/kv_directory_children.go @@ -0,0 +1,138 @@ +package redis3 + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/go-redis/redis/v8" +) + +const maxNameBatchSizeLimit = 1000000 + +func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error { + + // lock and unlock + mutex := redisStore.redsync.NewMutex(key + "lock") + if err := mutex.Lock(); err != nil { + return fmt.Errorf("lock %s: %v", key, err) + } + defer func() { + mutex.Unlock() + }() + + client := redisStore.Client + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + store := newSkipListElementStore(key, client) + nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) + + if err := nameList.WriteName(name); err != nil { + glog.Errorf("add %s %s: %v", key, name, err) + return err + } + + if !nameList.HasChanges() { + return nil + } + + if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil { + return err + } + + return nil +} + +func removeChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error { + + // lock and unlock + mutex := redisStore.redsync.NewMutex(key + "lock") + if err := mutex.Lock(); err != nil { + return fmt.Errorf("lock %s: %v", key, err) + } + defer mutex.Unlock() + + client := redisStore.Client + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + store := newSkipListElementStore(key, client) + nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) + + if err := nameList.DeleteName(name); err != nil { + return err + } + if !nameList.HasChanges() { + return nil + } + + if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil { + return err + } + + return nil +} + +func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, onDeleteFn func(name string) error) error { + + // lock and unlock + mutex := redisStore.redsync.NewMutex(key + "lock") + if err := mutex.Lock(); err != nil { + return fmt.Errorf("lock %s: %v", key, err) + } + defer mutex.Unlock() + + client := redisStore.Client + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + store := newSkipListElementStore(key, client) + nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) + + if err = nameList.ListNames("", func(name string) bool { + if err := onDeleteFn(name); err != nil { + glog.Errorf("delete %s child %s: %v", key, name, err) + return false + } + return true + }); err != nil { + return err + } + + if err = nameList.RemoteAllListElement(); err != nil { + return err + } + + return nil + +} + +func listChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, startFileName string, eachFn func(name string) bool) error { + client := redisStore.Client + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + store := newSkipListElementStore(key, client) + nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) + + if err = nameList.ListNames(startFileName, func(name string) bool { + return eachFn(name) + }); err != nil { + return err + } + + return nil + +} diff --git a/weed/filer/redis3/kv_directory_children_test.go b/weed/filer/redis3/kv_directory_children_test.go new file mode 100644 index 000000000..9d7acacf1 --- /dev/null +++ b/weed/filer/redis3/kv_directory_children_test.go @@ -0,0 +1,210 @@ +package redis3 + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/stvp/tempredis" + "strconv" + "testing" + "time" +) + +var names = []string{ + "cassandra.in.sh", + "cassandra", + "debug-cql.bat", + "nodetool", + "nodetool.bat", + "source-conf.ps1", + "sstableloader", + "sstableloader.bat", + "sstablescrub", + "sstablescrub.bat", + "sstableupgrade", + "sstableupgrade.bat", + "sstableutil", + "sstableutil.bat", + "sstableverify", + "sstableverify.bat", + "stop-server", + "stop-server.bat", + "stop-server.ps1", + "cassandra.in.bat", + "cqlsh.py", + "cqlsh", + "cassandra.ps1", + "cqlsh.bat", + "debug-cql", + "cassandra.bat", +} + +func yTestNameList(t *testing.T) { + server, err := tempredis.Start(tempredis.Config{}) + if err != nil { + panic(err) + } + defer server.Term() + + client := redis.NewClient(&redis.Options{ + Network: "unix", + Addr: server.Socket(), + }) + + store := newSkipListElementStore("/yyy/bin", client) + var data []byte + for _, name := range names { + nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) + nameList.WriteName(name) + + nameList.ListNames("", func(name string) bool { + println(name) + return true + }) + + if nameList.HasChanges() { + data = nameList.ToBytes() + } + println() + } + + nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) + nameList.ListNames("", func(name string) bool { + println(name) + return true + }) + +} + +func yBenchmarkNameList(b *testing.B) { + + server, err := tempredis.Start(tempredis.Config{}) + if err != nil { + panic(err) + } + defer server.Term() + + client := redis.NewClient(&redis.Options{ + Network: "unix", + Addr: server.Socket(), + }) + + store := newSkipListElementStore("/yyy/bin", client) + var data []byte + for i := 0; i < b.N; i++ { + nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) + + nameList.WriteName(strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx") + + if nameList.HasChanges() { + data = nameList.ToBytes() + } + } +} + +func BenchmarkRedis(b *testing.B) { + + server, err := tempredis.Start(tempredis.Config{}) + if err != nil { + panic(err) + } + defer server.Term() + + client := redis.NewClient(&redis.Options{ + Network: "unix", + Addr: server.Socket(), + }) + + for i := 0; i < b.N; i++ { + client.ZAddNX(context.Background(), "/yyy/bin", &redis.Z{Score: 0, Member: strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx"}) + } +} + +func xTestNameListAdd(t *testing.T) { + + server, err := tempredis.Start(tempredis.Config{}) + if err != nil { + panic(err) + } + defer server.Term() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + + client.FlushAll(context.Background()) + + N := 364800 + + ts0 := time.Now() + store := newSkipListElementStore("/y", client) + var data []byte + nameList := LoadItemList(data, "/y", client, store, 100000) + for i := 0; i < N; i++ { + nameList.WriteName(fmt.Sprintf("%8d", i)) + } + + ts1 := time.Now() + + for i := 0; i < N; i++ { + client.ZAddNX(context.Background(), "/x", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)}) + } + ts2 := time.Now() + + fmt.Printf("%v %v", ts1.Sub(ts0), ts2.Sub(ts1)) + + /* + keys := client.Keys(context.Background(), "/*m").Val() + for _, k := range keys { + println("key", k) + for i, v := range client.ZRangeByLex(context.Background(), k, &redis.ZRangeBy{ + Min: "-", + Max: "+", + }).Val() { + println(" ", i, v) + } + } + */ +} + +func xBenchmarkNameList(b *testing.B) { + + server, err := tempredis.Start(tempredis.Config{}) + if err != nil { + panic(err) + } + defer server.Term() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + + store := newSkipListElementStore("/yyy/bin", client) + var data []byte + for i := 0; i < b.N; i++ { + nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) + + nameList.WriteName(fmt.Sprintf("name %8d", i)) + + if nameList.HasChanges() { + data = nameList.ToBytes() + } + } +} + +func xBenchmarkRedis(b *testing.B) { + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + + for i := 0; i < b.N; i++ { + client.ZAddNX(context.Background(), "/xxx/bin", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)}) + } +} diff --git a/weed/filer/redis3/redis_cluster_store.go b/weed/filer/redis3/redis_cluster_store.go new file mode 100644 index 000000000..73fc0dd20 --- /dev/null +++ b/weed/filer/redis3/redis_cluster_store.go @@ -0,0 +1,45 @@ +package redis3 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisCluster3Store{}) +} + +type RedisCluster3Store struct { + UniversalRedis3Store +} + +func (store *RedisCluster3Store) GetName() string { + return "redis_cluster3" +} + +func (store *RedisCluster3Store) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", false) + configuration.SetDefault(prefix+"routeByLatency", false) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + ) +} + +func (store *RedisCluster3Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + store.redsync = redsync.New(goredis.NewPool(store.Client)) + return +} diff --git a/weed/filer/redis3/redis_sentinel_store.go b/weed/filer/redis3/redis_sentinel_store.go new file mode 100644 index 000000000..a87302167 --- /dev/null +++ b/weed/filer/redis3/redis_sentinel_store.go @@ -0,0 +1,49 @@ +package redis3 + +import ( + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &Redis3SentinelStore{}) +} + +type Redis3SentinelStore struct { + UniversalRedis3Store +} + +func (store *Redis3SentinelStore) GetName() string { + return "redis3_sentinel" +} + +func (store *Redis3SentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"masterName"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis3SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) { + store.Client = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: masterName, + SentinelAddrs: addresses, + Username: username, + Password: password, + DB: database, + MinRetryBackoff: time.Millisecond * 100, + MaxRetryBackoff: time.Minute * 1, + ReadTimeout: time.Second * 30, + WriteTimeout: time.Second * 5, + }) + store.redsync = redsync.New(goredis.NewPool(store.Client)) + return +} diff --git a/weed/filer/redis3/redis_store.go b/weed/filer/redis3/redis_store.go new file mode 100644 index 000000000..2eec3d37a --- /dev/null +++ b/weed/filer/redis3/redis_store.go @@ -0,0 +1,39 @@ +package redis3 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &Redis3Store{}) +} + +type Redis3Store struct { + UniversalRedis3Store +} + +func (store *Redis3Store) GetName() string { + return "redis3" +} + +func (store *Redis3Store) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis3Store) initialize(hostPort string, password string, database int) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + store.redsync = redsync.New(goredis.NewPool(store.Client)) + return +} diff --git a/weed/filer/redis3/skiplist_element_store.go b/weed/filer/redis3/skiplist_element_store.go new file mode 100644 index 000000000..8c101d006 --- /dev/null +++ b/weed/filer/redis3/skiplist_element_store.go @@ -0,0 +1,62 @@ +package redis3 + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/skiplist" + "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" +) + +type SkipListElementStore struct { + Prefix string + client redis.UniversalClient +} + +var _ = skiplist.ListStore(&SkipListElementStore{}) + +func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore { + return &SkipListElementStore{ + Prefix: prefix, + client: client, + } +} + +func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error { + key := fmt.Sprintf("%s%d", m.Prefix, id) + data, err := proto.Marshal(element) + if err != nil { + glog.Errorf("marshal %s: %v", key, err) + } + return m.client.Set(context.Background(), key, data, 0).Err() +} + +func (m *SkipListElementStore) DeleteElement(id int64) error { + key := fmt.Sprintf("%s%d", m.Prefix, id) + return m.client.Del(context.Background(), key).Err() +} + +func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) { + key := fmt.Sprintf("%s%d", m.Prefix, id) + data, err := m.client.Get(context.Background(), key).Result() + if err != nil { + if err == redis.Nil { + return nil, nil + } + return nil, err + } + t := &skiplist.SkipListElement{} + err = proto.Unmarshal([]byte(data), t) + if err == nil { + for i := 0; i < len(t.Next); i++ { + if t.Next[i].IsNil() { + t.Next[i] = nil + } + } + if t.Prev.IsNil() { + t.Prev = nil + } + } + return t, err +} diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go new file mode 100644 index 000000000..10a87e2a4 --- /dev/null +++ b/weed/filer/redis3/universal_redis_store.go @@ -0,0 +1,179 @@ +package redis3 + +import ( + "context" + "fmt" + "github.com/go-redsync/redsync/v4" + "time" + + "github.com/go-redis/redis/v8" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedis3Store struct { + Client redis.UniversalClient + redsync *redsync.Redsync +} + +func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedis3Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + value = util.MaybeGzipData(value) + } + + if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + dir, name := entry.FullPath.DirAndName() + + if name != "" { + if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + data, err := store.Client.Get(ctx, string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result() + if err != nil { + return fmt.Errorf("delete dir list %s : %v", fullpath, err) + } + + _, err = store.Client.Del(ctx, string(fullpath)).Result() + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + dir, name := fullpath.DirAndName() + + if name != "" { + if err = removeChild(ctx, store, genDirectoryListKey(dir), name); err != nil { + return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + return removeChildren(ctx, store, genDirectoryListKey(string(fullpath)), func(name string) error { + path := util.NewFullPath(string(fullpath), name) + _, err = store.Client.Del(ctx, string(path)).Result() + if err != nil { + return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) + } + // not efficient, but need to remove if it is a directory + store.Client.Del(ctx, genDirectoryListKey(string(path))) + return nil + }) + +} + +func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + dirListKey := genDirectoryListKey(string(dirPath)) + counter := int64(0) + + err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool { + if startFileName != "" { + if !includeStartFile && startFileName == fileName { + return true + } + } + + path := util.NewFullPath(string(dirPath), fileName) + entry, err := store.FindEntry(ctx, path) + lastFileName = fileName + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + if err == filer_pb.ErrNotFound { + return true + } + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(ctx, string(path)).Result() + store.Client.ZRem(ctx, dirListKey, fileName).Result() + return true + } + } + counter++ + if !eachEntryFunc(entry) { + return false + } + if counter >= limit { + return false + } + } + return true + }) + + return lastFileName, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedis3Store) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/redis3/universal_redis_store_kv.go b/weed/filer/redis3/universal_redis_store_kv.go new file mode 100644 index 000000000..a9c440a37 --- /dev/null +++ b/weed/filer/redis3/universal_redis_store_kv.go @@ -0,0 +1,42 @@ +package redis3 + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/go-redis/redis/v8" +) + +func (store *UniversalRedis3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + _, err = store.Client.Set(ctx, string(key), value, 0).Result() + + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *UniversalRedis3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + data, err := store.Client.Get(ctx, string(key)).Result() + + if err == redis.Nil { + return nil, filer.ErrKvNotFound + } + + return []byte(data), err +} + +func (store *UniversalRedis3Store) KvDelete(ctx context.Context, key []byte) (err error) { + + _, err = store.Client.Del(ctx, string(key)).Result() + + if err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/weed/filer/redis_lua/redis_cluster_store.go b/weed/filer/redis_lua/redis_cluster_store.go new file mode 100644 index 000000000..b68d1092c --- /dev/null +++ b/weed/filer/redis_lua/redis_cluster_store.go @@ -0,0 +1,44 @@ +package redis_lua + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisLuaClusterStore{}) +} + +type RedisLuaClusterStore struct { + UniversalRedisLuaStore +} + +func (store *RedisLuaClusterStore) GetName() string { + return "redis_lua_cluster" +} + +func (store *RedisLuaClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", false) + configuration.SetDefault(prefix+"routeByLatency", false) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), + ) +} + +func (store *RedisLuaClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + store.loadSuperLargeDirectories(superLargeDirectories) + return +} diff --git a/weed/filer/redis_lua/redis_sentinel_store.go b/weed/filer/redis_lua/redis_sentinel_store.go new file mode 100644 index 000000000..5530c098e --- /dev/null +++ b/weed/filer/redis_lua/redis_sentinel_store.go @@ -0,0 +1,45 @@ +package redis_lua + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" + "time" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisLuaSentinelStore{}) +} + +type RedisLuaSentinelStore struct { + UniversalRedisLuaStore +} + +func (store *RedisLuaSentinelStore) GetName() string { + return "redis_lua_sentinel" +} + +func (store *RedisLuaSentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"masterName"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) { + store.Client = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: masterName, + SentinelAddrs: addresses, + Username: username, + Password: password, + DB: database, + MinRetryBackoff: time.Millisecond * 100, + MaxRetryBackoff: time.Minute * 1, + ReadTimeout: time.Second * 30, + WriteTimeout: time.Second * 5, + }) + return +} diff --git a/weed/filer/redis_lua/redis_store.go b/weed/filer/redis_lua/redis_store.go new file mode 100644 index 000000000..a7d11c73c --- /dev/null +++ b/weed/filer/redis_lua/redis_store.go @@ -0,0 +1,38 @@ +package redis_lua + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisLuaStore{}) +} + +type RedisLuaStore struct { + UniversalRedisLuaStore +} + +func (store *RedisLuaStore) GetName() string { + return "redis_lua" +} + +func (store *RedisLuaStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), + ) +} + +func (store *RedisLuaStore) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + store.loadSuperLargeDirectories(superLargeDirectories) + return +} diff --git a/weed/filer/redis_lua/stored_procedure/delete_entry.lua b/weed/filer/redis_lua/stored_procedure/delete_entry.lua new file mode 100644 index 000000000..445337c77 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/delete_entry.lua @@ -0,0 +1,19 @@ +-- KEYS[1]: full path of entry +local fullpath = KEYS[1] +-- KEYS[2]: full path of entry +local fullpath_list_key = KEYS[2] +-- KEYS[3]: dir of the entry +local dir_list_key = KEYS[3] + +-- ARGV[1]: isSuperLargeDirectory +local isSuperLargeDirectory = ARGV[1] == "1" +-- ARGV[2]: name of the entry +local name = ARGV[2] + +redis.call("DEL", fullpath, fullpath_list_key) + +if not isSuperLargeDirectory and name ~= "" then + redis.call("ZREM", dir_list_key, name) +end + +return 0
\ No newline at end of file diff --git a/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua new file mode 100644 index 000000000..77e4839f9 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua @@ -0,0 +1,15 @@ +-- KEYS[1]: full path of entry +local fullpath = KEYS[1] + +if fullpath ~= "" and string.sub(fullpath, -1) == "/" then + fullpath = string.sub(fullpath, 0, -2) +end + +local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1") + +for _, name in ipairs(files) do + local file_path = fullpath .. "/" .. name + redis.call("DEL", file_path, file_path .. "\0") +end + +return 0
\ No newline at end of file diff --git a/weed/filer/redis_lua/stored_procedure/init.go b/weed/filer/redis_lua/stored_procedure/init.go new file mode 100644 index 000000000..1412ceba2 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/init.go @@ -0,0 +1,24 @@ +package stored_procedure + +import ( + _ "embed" + "github.com/go-redis/redis/v8" +) + +func init() { + InsertEntryScript = redis.NewScript(insertEntry) + DeleteEntryScript = redis.NewScript(deleteEntry) + DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren) +} + +//go:embed insert_entry.lua +var insertEntry string +var InsertEntryScript *redis.Script + +//go:embed delete_entry.lua +var deleteEntry string +var DeleteEntryScript *redis.Script + +//go:embed delete_folder_children.lua +var deleteFolderChildren string +var DeleteFolderChildrenScript *redis.Script diff --git a/weed/filer/redis_lua/stored_procedure/insert_entry.lua b/weed/filer/redis_lua/stored_procedure/insert_entry.lua new file mode 100644 index 000000000..8deef3446 --- /dev/null +++ b/weed/filer/redis_lua/stored_procedure/insert_entry.lua @@ -0,0 +1,27 @@ +-- KEYS[1]: full path of entry +local full_path = KEYS[1] +-- KEYS[2]: dir of the entry +local dir_list_key = KEYS[2] + +-- ARGV[1]: content of the entry +local entry = ARGV[1] +-- ARGV[2]: TTL of the entry +local ttlSec = tonumber(ARGV[2]) +-- ARGV[3]: isSuperLargeDirectory +local isSuperLargeDirectory = ARGV[3] == "1" +-- ARGV[4]: zscore of the entry in zset +local zscore = tonumber(ARGV[4]) +-- ARGV[5]: name of the entry +local name = ARGV[5] + +if ttlSec > 0 then + redis.call("SET", full_path, entry, "EX", ttlSec) +else + redis.call("SET", full_path, entry) +end + +if not isSuperLargeDirectory and name ~= "" then + redis.call("ZADD", dir_list_key, "NX", zscore, name) +end + +return 0
\ No newline at end of file diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go new file mode 100644 index 000000000..0ab0f2f24 --- /dev/null +++ b/weed/filer/redis_lua/universal_redis_store.go @@ -0,0 +1,191 @@ +package redis_lua + +import ( + "context" + "fmt" + "time" + + "github.com/go-redis/redis/v8" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/redis_lua/stored_procedure" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedisLuaStore struct { + Client redis.UniversalClient + superLargeDirectoryHash map[string]bool +} + +func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) { + _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) { + // set directory hash + store.superLargeDirectoryHash = make(map[string]bool) + for _, dir := range superLargeDirectories { + store.superLargeDirectoryHash[dir] = true + } +} + +func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + value = util.MaybeGzipData(value) + } + + dir, name := entry.FullPath.DirAndName() + + err = stored_procedure.InsertEntryScript.Run(ctx, store.Client, + []string{string(entry.FullPath), genDirectoryListKey(dir)}, + value, entry.TtlSec, + store.isSuperLargeDirectory(dir), 0, name).Err() + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + data, err := store.Client.Get(ctx, string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + dir, name := fullpath.DirAndName() + + err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client, + []string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)}, + store.isSuperLargeDirectory(dir), name).Err() + + if err != nil { + return fmt.Errorf("DeleteEntry %s : %v", fullpath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + if store.isSuperLargeDirectory(string(fullpath)) { + return nil + } + + err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client, + []string{string(fullpath)}).Err() + + if err != nil { + return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + dirListKey := genDirectoryListKey(string(dirPath)) + + min := "-" + if startFileName != "" { + if includeStartFile { + min = "[" + startFileName + } else { + min = "(" + startFileName + } + } + + members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{ + Min: min, + Max: "+", + Offset: 0, + Count: limit, + }).Result() + if err != nil { + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) + } + + // fetch entry meta + for _, fileName := range members { + path := util.NewFullPath(string(dirPath), fileName) + entry, err := store.FindEntry(ctx, path) + lastFileName = fileName + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + if err == filer_pb.ErrNotFound { + continue + } + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.DeleteEntry(ctx, path) + continue + } + } + if !eachEntryFunc(entry) { + break + } + } + } + + return lastFileName, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedisLuaStore) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/redis_lua/universal_redis_store_kv.go b/weed/filer/redis_lua/universal_redis_store_kv.go new file mode 100644 index 000000000..3df980b66 --- /dev/null +++ b/weed/filer/redis_lua/universal_redis_store_kv.go @@ -0,0 +1,42 @@ +package redis_lua + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/go-redis/redis/v8" +) + +func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + _, err = store.Client.Set(ctx, string(key), value, 0).Result() + + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + data, err := store.Client.Get(ctx, string(key)).Result() + + if err == redis.Nil { + return nil, filer.ErrKvNotFound + } + + return []byte(data), err +} + +func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) { + + _, err = store.Client.Del(ctx, string(key)).Result() + + if err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go new file mode 100644 index 000000000..b0534e2ca --- /dev/null +++ b/weed/filer/remote_mapping.go @@ -0,0 +1,121 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" +) + +func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return readErr + }); readErr != nil { + return nil, readErr + } + + mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) + if readErr != nil { + return nil, fmt.Errorf("unmarshal mappings: %v", readErr) + } + + return +} + +func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error) { + + // read current mapping + var oldContent, newContent []byte + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return err + }) + if err != nil { + if err != filer_pb.ErrNotFound { + return fmt.Errorf("read existing mapping: %v", err) + } + } + + // add new mapping + newContent, err = addRemoteStorageMapping(oldContent, dir, remoteStorageLocation) + if err != nil { + return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err) + } + + // save back + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) + }) + if err != nil { + return fmt.Errorf("save mapping: %v", err) + } + + return nil +} + +func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error) { + + // read current mapping + var oldContent, newContent []byte + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return err + }) + if err != nil { + if err != filer_pb.ErrNotFound { + return fmt.Errorf("read existing mapping: %v", err) + } + } + + // add new mapping + newContent, err = removeRemoteStorageMapping(oldContent, dir) + if err != nil { + return fmt.Errorf("delete mount %s: %v", dir, err) + } + + // save back + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) + }) + if err != nil { + return fmt.Errorf("save mapping: %v", err) + } + + return nil +} + +func addRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + // skip + } + + // set the new mapping + mappings.Mappings[dir] = storageLocation + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} + +func removeRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + return nil, unmarshalErr + } + + // set the new mapping + delete(mappings.Mappings, dir) + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/remote_storage.go index 65704e652..5362ba738 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/remote_storage.go @@ -131,58 +131,9 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo return } -func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) { - mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) - if unmarshalErr != nil { - // skip - } - - // set the new mapping - mappings.Mappings[dir] = storageLocation - - if newContent, err = proto.Marshal(mappings); err != nil { - return oldContent, fmt.Errorf("marshal mappings: %v", err) - } - - return -} - -func RemoveRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) { - mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) - if unmarshalErr != nil { - return nil, unmarshalErr - } - - // set the new mapping - delete(mappings.Mappings, dir) - - if newContent, err = proto.Marshal(mappings); err != nil { - return oldContent, fmt.Errorf("marshal mappings: %v", err) - } - - return -} - -func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *remote_pb.RemoteStorageMapping, readErr error) { - var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) - return readErr - }); readErr != nil { - return nil, readErr - } - - mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) - if readErr != nil { - return nil, fmt.Errorf("unmarshal mappings: %v", readErr) - } - - return -} - -func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *remote_pb.RemoteConf, readErr error) { +func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) { var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) return readErr }); readErr != nil { @@ -199,7 +150,7 @@ func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, return } -func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress string, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) { +func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) { mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress) if listErr != nil { diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/remote_storage_test.go index 9f4d7af2f..9f4d7af2f 100644 --- a/weed/filer/filer_remote_storage_test.go +++ b/weed/filer/remote_storage_test.go diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index 729da7d9b..f48c3988c 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -10,7 +10,7 @@ import ( "io" "os" - "github.com/tecbot/gorocksdb" + gorocksdb "github.com/linxGnu/grocksdb" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go index f6e755b4b..faabcd341 100644 --- a/weed/filer/rocksdb/rocksdb_store_test.go +++ b/weed/filer/rocksdb/rocksdb_store_test.go @@ -1,3 +1,4 @@ +//go:build rocksdb // +build rocksdb package rocksdb @@ -5,7 +6,6 @@ package rocksdb import ( "context" "fmt" - "io/ioutil" "os" "testing" "time" @@ -16,8 +16,7 @@ import ( func TestCreateAndFind(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -35,7 +34,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil { + if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } @@ -70,8 +69,7 @@ func TestCreateAndFind(t *testing.T) { func TestEmptyRoot(t *testing.T) { testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") - defer os.RemoveAll(dir) + dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) testFiler.SetStore(store) @@ -93,8 +91,7 @@ func TestEmptyRoot(t *testing.T) { func BenchmarkInsertEntry(b *testing.B) { testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) - dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench") - defer os.RemoveAll(dir) + dir := b.TempDir() store := &RocksDBStore{} store.initialize(dir) testFiler.SetStore(store) diff --git a/weed/filer/rocksdb/rocksdb_ttl.go b/weed/filer/rocksdb/rocksdb_ttl.go index faed22310..7e9643083 100644 --- a/weed/filer/rocksdb/rocksdb_ttl.go +++ b/weed/filer/rocksdb/rocksdb_ttl.go @@ -5,7 +5,7 @@ package rocksdb import ( "time" - "github.com/tecbot/gorocksdb" + gorocksdb "github.com/linxGnu/grocksdb" "github.com/chrislusf/seaweedfs/weed/filer" ) @@ -38,3 +38,8 @@ func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []by func (t *TTLFilter) Name() string { return "TTLFilter" } +func (t *TTLFilter) SetIgnoreSnapshots(value bool) { +} + +func (t *TTLFilter) Destroy() { +} diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go index 55c976915..891bf925b 100644 --- a/weed/filer/s3iam_conf.go +++ b/weed/filer/s3iam_conf.go @@ -2,13 +2,12 @@ package filer import ( "bytes" - "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "io" ) -func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfiguration) error { +func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error { if err := jsonpb.Unmarshal(bytes.NewBuffer(content), config); err != nil { return err } diff --git a/weed/filer/sqlite/doc.go b/weed/filer/sqlite/doc.go new file mode 100644 index 000000000..833addf54 --- /dev/null +++ b/weed/filer/sqlite/doc.go @@ -0,0 +1,9 @@ +/* + +Package sqlite is for sqlite filer store. + +The referenced "modernc.org/sqlite" library is too big when compiled. +So this is only compiled in "make full_install". + +*/ +package sqlite diff --git a/weed/filer/sqlite/sqlite_store.go b/weed/filer/sqlite/sqlite_store.go index 6b055e53c..70a4bf390 100644 --- a/weed/filer/sqlite/sqlite_store.go +++ b/weed/filer/sqlite/sqlite_store.go @@ -1,4 +1,6 @@ +//go:build (linux || darwin || windows) && sqlite // +build linux darwin windows +// +build sqlite // limited GOOS due to modernc.org/libc/unistd diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go index 803c71afa..351d2e501 100644 --- a/weed/filer/sqlite/sqlite_store_unsupported.go +++ b/weed/filer/sqlite/sqlite_store_unsupported.go @@ -1,4 +1,5 @@ -// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64 +//go:build !linux && !darwin && !windows && !s390 && !ppc64le && !mips64 && !sqlite +// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64,!sqlite // limited GOOS due to modernc.org/libc/unistd diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go new file mode 100644 index 000000000..ad578442c --- /dev/null +++ b/weed/filer/store_test/test_suite.go @@ -0,0 +1,55 @@ +package store_test + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestFilerStore(t *testing.T, store filer.FilerStore) { + ctx := context.Background() + + store.InsertEntry(ctx, makeEntry(util.FullPath("/"), true)) + store.InsertEntry(ctx, makeEntry(util.FullPath("/a"), true)) + store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b"), true)) + store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b/c"), true)) + for i := 0; i < 2000; i++ { + store.InsertEntry(ctx, makeEntry(util.FullPath(fmt.Sprintf("/a/b/c/f%05d", i)), false)) + } + + { + var counter int + lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) bool { + counter++ + return true + }) + assert.Nil(t, err, "list directory") + assert.Equal(t, 3, counter, "directory list counter") + assert.Equal(t, "f00003", lastFileName, "directory list last file") + lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool { + counter++ + return true + }) + assert.Nil(t, err, "list directory") + assert.Equal(t, 1027, counter, "directory list counter") + assert.Equal(t, "f01027", lastFileName, "directory list last file") + } + +} + +func makeEntry(fullPath util.FullPath, isDirectory bool) *filer.Entry { + var mode os.FileMode + if isDirectory { + mode = os.ModeDir + } + return &filer.Entry{ + FullPath: fullPath, + Attr: filer.Attr{ + Mode: mode, + }, + } +} diff --git a/weed/filer/stream.go b/weed/filer/stream.go index ce0264cd3..7da9fd0a0 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,6 +3,7 @@ package filer import ( "bytes" "fmt" + "golang.org/x/exp/slices" "io" "math" "sort" @@ -39,11 +40,11 @@ func isSameChunks(a, b []*filer_pb.FileChunk) bool { if len(a) != len(b) { return false } - sort.Slice(a, func(i, j int) bool { - return strings.Compare(a[i].ETag, a[j].ETag) < 0 + slices.SortFunc(a, func(i, j *filer_pb.FileChunk) bool { + return strings.Compare(i.ETag, j.ETag) < 0 }) - sort.Slice(b, func(i, j int) bool { - return strings.Compare(b[i].ETag, b[j].ETag) < 0 + slices.SortFunc(b, func(i, j *filer_pb.FileChunk) bool { + return strings.Compare(i.ETag, j.ETag) < 0 }) for i := 0; i < len(a); i++ { if a[i].ETag != b[i].ETag { @@ -62,7 +63,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) + glog.V(4).Infof("start to stream content for chunks: %+v", chunks) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -80,11 +81,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ fileId2Url[chunkView.FileId] = urlStrings } + remaining := size for _, chunkView := range chunkViews { - + if offset < chunkView.LogicOffset { + gap := chunkView.LogicOffset - offset + remaining -= gap + glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset) + err := writeZero(writer, gap) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset) + } + offset = chunkView.LogicOffset + } urlStrings := fileId2Url[chunkView.FileId] start := time.Now() err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + offset += int64(chunkView.Size) + remaining -= int64(chunkView.Size) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() @@ -92,6 +105,13 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ } stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() } + if remaining > 0 { + glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) + err := writeZero(writer, remaining) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + } + } return nil @@ -99,42 +119,59 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ // ---------------- ReadAllReader ---------------------------------- -func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { +func writeZero(w io.Writer, size int64) (err error) { + zeroPadding := make([]byte, 1024) + var written int + for size > 0 { + if size > 1024 { + written, err = w.Write(zeroPadding) + } else { + written, err = w.Write(zeroPadding[:size]) + } + size -= int64(written) + if err != nil { + return + } + } + return +} - buffer := bytes.Buffer{} +func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error { lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { return masterClient.LookupFileId(fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer))) + + idx := 0 for _, chunkView := range chunkViews { urlStrings, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return nil, err + return err } - data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset) if err != nil { - return nil, err + return err } - buffer.Write(data) + idx += n } - return buffer.Bytes(), nil + return nil } // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - totalSize int64 - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferLock sync.Mutex - chunk string - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + logicOffset int64 + buffer []byte + bufferOffset int64 + bufferLock sync.Mutex + chunk string + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) @@ -143,8 +180,8 @@ var _ = io.ReaderAt(&ChunkStreamReader{}) func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - sort.Slice(chunkViews, func(i, j int) bool { - return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset + slices.SortFunc(chunkViews, func(a, b *ChunkView) bool { + return a.LogicOffset < b.LogicOffset }) var totalSize int64 @@ -206,7 +243,7 @@ func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) { } func (c *ChunkStreamReader) isBufferEmpty() bool { - return len(c.buffer) <= int(c.logicOffset - c.bufferOffset) + return len(c.buffer) <= int(c.logicOffset-c.bufferOffset) } func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { @@ -261,7 +298,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { } else if currentChunkIndex > 0 { if insideChunk(offset, c.chunkViews[currentChunkIndex]) { // good hit - } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]){ + } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) { currentChunkIndex -= 1 // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) } else { @@ -297,7 +334,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go index 4a8e8784d..f333af38e 100644 --- a/weed/filer/tikv/tikv_store.go +++ b/weed/filer/tikv/tikv_store.go @@ -1,5 +1,3 @@ -// +build tikv - package tikv import ( @@ -14,7 +12,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" ) @@ -27,8 +24,9 @@ func init() { } type TikvStore struct { - client *tikv.KVStore + client *txnkv.Client deleteRangeConcurrency int + onePC bool } // Basic APIs @@ -46,12 +44,13 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err if drc <= 0 { drc = 1 } + store.onePC = config.GetBool(prefix + "enable_1pc") store.deleteRangeConcurrency = drc return store.initialize(pdAddrs) } func (store *TikvStore) initialize(pdAddrs []string) error { - client, err := tikv.NewTxnClient(pdAddrs) + client, err := txnkv.NewClient(pdAddrs) store.client = client return err } @@ -298,6 +297,9 @@ func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, if err != nil { return ctx, err } + if store.onePC { + tx.SetEnable1PC(store.onePC) + } return context.WithValue(ctx, "tx", tx), nil } @@ -344,6 +346,9 @@ func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) { if err != nil { return nil, err } + if store.onePC { + txn.SetEnable1PC(store.onePC) + } return &TxnWrapper{txn, false}, nil } diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go index 3fed7e045..dcc9acf8c 100644 --- a/weed/filer/tikv/tikv_store_kv.go +++ b/weed/filer/tikv/tikv_store_kv.go @@ -1,5 +1,3 @@ -// +build tikv - package tikv import ( diff --git a/weed/filer/ydb/doc.go b/weed/filer/ydb/doc.go new file mode 100644 index 000000000..6ade3a8d8 --- /dev/null +++ b/weed/filer/ydb/doc.go @@ -0,0 +1,9 @@ +/* + +Package ydb is for YDB filer store. + +The referenced "github.com/ydb-platform/ydb-go-sdk/v3" library is too big when compiled. +So this is only compiled in "make full_install". + +*/ +package ydb diff --git a/weed/filer/ydb/readme.md b/weed/filer/ydb/readme.md new file mode 100644 index 000000000..b617461fd --- /dev/null +++ b/weed/filer/ydb/readme.md @@ -0,0 +1,27 @@ +## YDB + +database: https://github.com/ydb-platform/ydb + +go driver: https://github.com/ydb-platform/ydb-go-sdk + +options: + +``` +[ydb] +enabled=true +dsn=grpcs://ydb-ru.yandex.net:2135/?database=/ru/home/username/db +prefix="seaweedfs" +useBucketPrefix=true +poolSizeLimit=50 +dialTimeOut = 10 +``` + +Authenticate produced with one of next environment variables: + * `YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=<path/to/sa_key_file>` — used service account key file by path + * `YDB_ANONYMOUS_CREDENTIALS="1"` — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation + * `YDB_METADATA_CREDENTIALS="1"` — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function + * `YDB_ACCESS_TOKEN_CREDENTIALS=<access_token>` — used for authenticate to YDB with short-life access token. For example, access token may be IAM token + * `YDB_CONNECTION_STRING="grpcs://endpoint/?database=database"` + + * i test using this dev database: +`make dev_ydb` diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go new file mode 100644 index 000000000..f1db9a143 --- /dev/null +++ b/weed/filer/ydb/ydb_queries.go @@ -0,0 +1,72 @@ +//go:build ydb +// +build ydb + +package ydb + +import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + +const ( + upsertQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $name AS Utf8; + DECLARE $meta AS String; + DECLARE $expire_at AS Optional<uint32>; + + UPSERT INTO ` + asql.DEFAULT_TABLE + ` + (dir_hash, name, directory, meta, expire_at) + VALUES + ($dir_hash, $name, $directory, $meta, $expire_at);` + + deleteQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $name AS Utf8; + + DELETE FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND name = $name;` + + findQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $name AS Utf8; + + SELECT meta + FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND name = $name;` + + deleteFolderChildrenQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + + DELETE FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND directory = $directory;` + + listDirectoryQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $start_name AS Utf8; + DECLARE $prefix AS Utf8; + DECLARE $limit AS Uint64; + + SELECT name, meta + FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND directory = $directory and name > $start_name and name LIKE $prefix + ORDER BY name ASC LIMIT $limit;` + + listInclusiveDirectoryQuery = ` + PRAGMA TablePathPrefix("%v"); + DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; + DECLARE $start_name AS Utf8; + DECLARE $prefix AS Utf8; + DECLARE $limit AS Uint64; + + SELECT name, meta + FROM ` + asql.DEFAULT_TABLE + ` + WHERE dir_hash = $dir_hash AND directory = $directory and name >= $start_name and name LIKE $prefix + ORDER BY name ASC LIMIT $limit;` +) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go new file mode 100644 index 000000000..1e3a55a09 --- /dev/null +++ b/weed/filer/ydb/ydb_store.go @@ -0,0 +1,413 @@ +//go:build ydb +// +build ydb + +package ydb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/sugar" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "os" + "path" + "strings" + "sync" + "time" +) + +const ( + defaultDialTimeOut = 10 +) + +var ( + roTX = table.TxControl( + table.BeginTx(table.WithOnlineReadOnly()), + table.CommitTx(), + ) + rwTX = table.DefaultTxControl() +) + +type YdbStore struct { + DB ydb.Connection + dirBuckets string + tablePathPrefix string + SupportBucketTable bool + dbs map[string]bool + dbsLock sync.Mutex +} + +func init() { + filer.Stores = append(filer.Stores, &YdbStore{}) +} + +func (store *YdbStore) GetName() string { + return "ydb" +} + +func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString("filer.options.buckets_folder"), + configuration.GetString(prefix+"dsn"), + configuration.GetString(prefix+"prefix"), + configuration.GetBool(prefix+"useBucketPrefix"), + configuration.GetInt(prefix+"dialTimeOut"), + configuration.GetInt(prefix+"poolSizeLimit"), + ) +} + +func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int) (err error) { + store.dirBuckets = dirBuckets + store.SupportBucketTable = useBucketPrefix + if store.SupportBucketTable { + glog.V(0).Infof("enabled BucketPrefix") + } + store.dbs = make(map[string]bool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if dialTimeOut == 0 { + dialTimeOut = defaultDialTimeOut + } + opts := []ydb.Option{ + ydb.WithDialTimeout(time.Duration(dialTimeOut) * time.Second), + environ.WithEnvironCredentials(ctx), + } + if poolSizeLimit > 0 { + opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit)) + } + if dsn == "" { + dsn = os.Getenv("YDB_CONNECTION_STRING") + } + store.DB, err = ydb.Open(ctx, dsn, opts...) + if err != nil { + if store.DB != nil { + _ = store.DB.Close(ctx) + store.DB = nil + } + return fmt.Errorf("can not connect to %s error: %v", dsn, err) + } + + store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) + if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { + return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err) + } + + if err = store.createTable(ctx, store.tablePathPrefix); err != nil { + glog.Errorf("createTable %s: %v", store.tablePathPrefix, err) + } + return err +} + +func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) { + var res result.Result + if tx, ok := ctx.Value("tx").(table.Transaction); ok { + res, err = tx.Execute(ctx, *query, params) + if err != nil { + return fmt.Errorf("execute transaction: %v", err) + } + } else { + err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { + _, res, err = s.Execute(ctx, tc, *query, params) + if err != nil { + return fmt.Errorf("execute statement: %v", err) + } + return nil + }) + } + if err != nil { + return err + } + if res != nil { + defer func() { _ = res.Close() }() + if processResultFunc != nil { + if err = processResultFunc(res); err != nil { + return fmt.Errorf("process result: %v", err) + } + } + } + return err +} + +func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta} + return store.doTxOrDB(ctx, withPragma(tablePathPrefix, upsertQuery), fileMeta.queryParameters(entry.TtlSec), rwTX, nil) +} + +func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry) +} + +func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry) +} + +func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + var data []byte + entryFound := false + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, findQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$name", types.UTF8Value(name))) + + err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { + if !res.NextResultSet(ctx) || !res.HasNextRow() { + return nil + } + for res.NextRow() { + if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { + return fmt.Errorf("scanNamed %s : %v", fullpath, err) + } + entryFound = true + return nil + } + return res.Err() + }) + if err != nil { + return nil, err + } + if !entryFound { + return nil, filer_pb.ErrNotFound + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + return nil, fmt.Errorf("decode %s : %v", fullpath, err) + } + + return entry, nil +} + +func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, deleteQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$name", types.UTF8Value(name))) + + return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) +} + +func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + dir, _ := fullpath.DirAndName() + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + query := withPragma(tablePathPrefix, deleteFolderChildrenQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir))) + + return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) +} + +func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + dir := string(dirPath) + tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) + var query *string + if includeStartFile { + query = withPragma(tablePathPrefix, listInclusiveDirectoryQuery) + } else { + query = withPragma(tablePathPrefix, listDirectoryQuery) + } + truncated := true + eachEntryFuncIsNotBreake := true + entryCount := int64(0) + for truncated && eachEntryFuncIsNotBreake { + if lastFileName != "" { + startFileName = lastFileName + if includeStartFile { + query = withPragma(tablePathPrefix, listDirectoryQuery) + } + } + restLimit := limit - entryCount + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir)), + table.ValueParam("$start_name", types.UTF8Value(startFileName)), + table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), + table.ValueParam("$limit", types.Uint64Value(uint64(restLimit))), + ) + err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { + var name string + var data []byte + if !res.NextResultSet(ctx) || !res.HasNextRow() { + truncated = false + return nil + } + truncated = res.CurrentResultSet().Truncated() + for res.NextRow() { + if err := res.ScanNamed( + named.OptionalWithDefault("name", &name), + named.OptionalWithDefault("meta", &data)); err != nil { + return fmt.Errorf("list scanNamed %s : %v", dir, err) + } + lastFileName = name + entry := &filer.Entry{ + FullPath: util.NewFullPath(dir, name), + } + if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + return fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + } + if !eachEntryFunc(entry) { + eachEntryFuncIsNotBreake = false + break + } + entryCount += 1 + } + return res.Err() + }) + } + if err != nil { + return lastFileName, err + } + return lastFileName, nil +} + +func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + session, err := store.DB.Table().CreateSession(ctx) + if err != nil { + return ctx, err + } + tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite())) + if err != nil { + return ctx, err + } + return context.WithValue(ctx, "tx", tx), nil +} + +func (store *YdbStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(table.Transaction); ok { + _, err := tx.CommitTx(ctx) + return err + } + return nil +} + +func (store *YdbStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(table.Transaction); ok { + return tx.Rollback(ctx) + } + return nil +} + +func (store *YdbStore) Shutdown() { + _ = store.DB.Close(context.Background()) +} + +func (store *YdbStore) CanDropWholeBucket() bool { + return store.SupportBucketTable +} + +func (store *YdbStore) OnBucketCreation(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if err := store.createTable(context.Background(), + path.Join(store.tablePathPrefix, bucket)); err != nil { + glog.Errorf("createTable %s: %v", bucket, err) + } + + if store.dbs == nil { + return + } + store.dbs[bucket] = true +} + +func (store *YdbStore) OnBucketDeletion(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if err := store.deleteTable(context.Background(), + path.Join(store.tablePathPrefix, bucket)); err != nil { + glog.Errorf("deleteTable %s: %v", bucket, err) + } + + if store.dbs == nil { + return + } + delete(store.dbs, bucket) +} + +func (store *YdbStore) createTable(ctx context.Context, prefix string) error { + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...) + }) +} + +func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error { + if !store.SupportBucketTable { + return nil + } + if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE)) + }); err != nil { + return err + } + glog.V(4).Infof("deleted table %s", prefix) + + return nil +} + +func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPrefix *string, shortDir *string) { + tablePathPrefix = &store.tablePathPrefix + shortDir = dir + if !store.SupportBucketTable { + return + } + + prefixBuckets := store.dirBuckets + "/" + if strings.HasPrefix(*dir, prefixBuckets) { + // detect bucket + bucketAndDir := (*dir)[len(prefixBuckets):] + var bucket string + if t := strings.Index(bucketAndDir, "/"); t > 0 { + bucket = bucketAndDir[:t] + } else if t < 0 { + bucket = bucketAndDir + } + if bucket == "" { + return + } + + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + tablePathPrefixWithBucket := path.Join(store.tablePathPrefix, bucket) + if _, found := store.dbs[bucket]; !found { + if err := store.createTable(ctx, tablePathPrefixWithBucket); err == nil { + store.dbs[bucket] = true + glog.V(4).Infof("created table %s", tablePathPrefixWithBucket) + } else { + glog.Errorf("createTable %s: %v", tablePathPrefixWithBucket, err) + } + } + tablePathPrefix = &tablePathPrefixWithBucket + } + return +} diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go new file mode 100644 index 000000000..72bbfff42 --- /dev/null +++ b/weed/filer/ydb/ydb_store_kv.go @@ -0,0 +1,75 @@ +//go:build ydb +// +build ydb + +package ydb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" +) + +func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + fileMeta := FileMeta{dirHash, name, dirStr, value} + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { + _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, upsertQuery), + fileMeta.queryParameters(0)) + if err != nil { + return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) + } + return nil + }) +} + +func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + valueFound := false + err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, res, err := s.Execute(ctx, roTX, *withPragma(&store.tablePathPrefix, findQuery), + table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(dirHash)), + table.ValueParam("$name", types.UTF8Value(name)))) + if err != nil { + return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) + } + defer func() { _ = res.Close() }() + if !res.NextResultSet(ctx) || !res.HasNextRow() { + return nil + } + for res.NextRow() { + if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil { + return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err) + } + valueFound = true + return nil + } + return res.Err() + }) + + if !valueFound { + return nil, filer.ErrKvNotFound + } + + return value, nil +} + +func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { + _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, deleteQuery), + table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(dirHash)), + table.ValueParam("$name", types.UTF8Value(name)))) + if err != nil { + return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err) + } + return nil + }) + +} diff --git a/weed/filer/ydb/ydb_store_test.go b/weed/filer/ydb/ydb_store_test.go new file mode 100644 index 000000000..cb3c77018 --- /dev/null +++ b/weed/filer/ydb/ydb_store_test.go @@ -0,0 +1,19 @@ +//go:build ydb +// +build ydb + +package ydb + +import ( + "github.com/chrislusf/seaweedfs/weed/filer/store_test" + "testing" +) + +func TestStore(t *testing.T) { + // run "make test_ydb" under docker folder. + // to set up local env + if false { + store := &YdbStore{} + store.initialize("/buckets", "grpc://localhost:2136/?database=local", "seaweedfs", true, 10, 50) + store_test.TestFilerStore(t, store) + } +} diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go new file mode 100644 index 000000000..4e5100236 --- /dev/null +++ b/weed/filer/ydb/ydb_types.go @@ -0,0 +1,56 @@ +//go:build ydb +// +build ydb + +package ydb + +import ( + "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" +) + +type FileMeta struct { + DirHash int64 `ydb:"type:int64"` + Name string `ydb:"type:utf8"` + Directory string `ydb:"type:utf8"` + Meta []byte `ydb:"type:string"` +} + +type FileMetas []FileMeta + +func (fm *FileMeta) queryParameters(ttlSec int32) *table.QueryParameters { + var expireAtValue types.Value + if ttlSec > 0 { + expireAtValue = types.Uint32Value(uint32(ttlSec)) + } else { + expireAtValue = types.NullValue(types.TypeUint32) + } + return table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), + table.ValueParam("$directory", types.UTF8Value(fm.Directory)), + table.ValueParam("$name", types.UTF8Value(fm.Name)), + table.ValueParam("$meta", types.StringValue(fm.Meta)), + table.ValueParam("$expire_at", expireAtValue)) +} + +func createTableOptions() []options.CreateTableOption { + columnUnit := options.TimeToLiveUnitSeconds + return []options.CreateTableOption{ + options.WithColumn("dir_hash", types.Optional(types.TypeInt64)), + options.WithColumn("directory", types.Optional(types.TypeUTF8)), + options.WithColumn("name", types.Optional(types.TypeUTF8)), + options.WithColumn("meta", types.Optional(types.TypeString)), + options.WithColumn("expire_at", types.Optional(types.TypeUint32)), + options.WithPrimaryKeyColumn("dir_hash", "name"), + options.WithTimeToLiveSettings(options.TimeToLiveSettings{ + ColumnName: "expire_at", + ColumnUnit: &columnUnit, + Mode: options.TimeToLiveModeValueSinceUnixEpoch}, + ), + } +} +func withPragma(prefix *string, query string) *string { + queryWithPragma := fmt.Sprintf(query, *prefix) + return &queryWithPragma +} |
