aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-03 11:00:20 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-03 11:00:20 -0700
commitb8f32bcab94a23cc5cb92f32fdd655a5b55ebb6d (patch)
treebde567ba06f243041e97aef31c35ccdce001d132
parentf76a2b2c8a22c97a5811e0ccf1776043ecc4a0f1 (diff)
downloadseaweedfs-b8f32bcab94a23cc5cb92f32fdd655a5b55ebb6d.tar.xz
seaweedfs-b8f32bcab94a23cc5cb92f32fdd655a5b55ebb6d.zip
filer: compress stored metadata
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go8
-rw-r--r--weed/filer/cassandra/cassandra_store.go8
-rw-r--r--weed/filer/etcd/etcd_store.go12
-rw-r--r--weed/filer/leveldb/leveldb_store.go8
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go9
-rw-r--r--weed/filer/mongodb/mongodb_store.go8
-rw-r--r--weed/filer/redis/universal_redis_store.go6
-rw-r--r--weed/filer/redis2/universal_redis_store.go6
-rw-r--r--weed/shell/command_fs_meta_cat.go6
-rw-r--r--weed/util/compression.go37
10 files changed, 85 insertions, 23 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 368bec973..48b5795c2 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -67,6 +67,10 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
if err != nil {
if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
@@ -126,7 +130,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
entry := &filer.Entry{
FullPath: fullpath,
}
- if err := entry.DecodeAttributesAndChunks(data); err != nil {
+ if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -188,7 +192,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), name),
}
- if err = entry.DecodeAttributesAndChunks(data); err != nil {
+ if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index fd161b1f1..250db629a 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -60,6 +60,10 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+
if err := store.session.Query(
"INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
dir, name, meta, entry.TtlSec).Exec(); err != nil {
@@ -93,7 +97,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data)
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -144,7 +148,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), name),
}
- if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index 36db4ac01..634fba1eb 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -76,12 +76,16 @@ func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
key := genKey(entry.DirAndName())
- value, err := entry.EncodeAttributesAndChunks()
+ meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
+ if len(entry.Chunks) > 50 {
+ meta = weed_util.MaybeGzipData(meta)
+ }
+
+ if _, err := store.client.Put(ctx, string(key), string(meta)); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@@ -107,7 +111,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(resp.Kvs[0].Value))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -163,7 +167,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
- if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index eccb760a2..4b8dd5ea9 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -78,6 +78,10 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = weed_util.MaybeGzipData(value)
+ }
+
err = store.db.Put(key, value, nil)
if err != nil {
@@ -109,7 +113,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data)
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData((data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -187,7 +191,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 7a2bdac2e..2ad0dd648 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -85,6 +85,10 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = weed_util.MaybeGzipData(value)
+ }
+
err = store.dbs[partitionId].Put(key, value, nil)
if err != nil {
@@ -117,7 +121,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data)
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -199,8 +203,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
-
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index 1fc67931a..b7e855165 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -101,6 +101,10 @@ func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+
c := store.connect.Database(store.database).Collection(store.collectionName)
_, err = c.InsertOne(ctx, Model{
@@ -140,7 +144,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data.Meta)
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -197,7 +201,7 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath ut
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), data.Name),
}
- if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index cc8819019..0de9924a3 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -40,6 +40,10 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
@@ -76,7 +80,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks([]byte(data))
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 9e06ff68f..c213b39a8 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -38,6 +38,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@@ -71,7 +75,7 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks([]byte(data))
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index 94ce02596..a097a3a4e 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -70,8 +70,10 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
fmt.Fprintf(writer, "%s\n", text)
- bytes, err := proto.Marshal(respLookupEntry.Entry)
- fmt.Fprintf(writer, "chunks %d meta size: %d\n", len(respLookupEntry.Entry.Chunks), len(bytes))
+ bytes, _ := proto.Marshal(respLookupEntry.Entry)
+ gzippedBytes, _ := util.GzipData(bytes)
+ zstdBytes, _ := util.ZstdData(bytes)
+ fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes))
return nil
diff --git a/weed/util/compression.go b/weed/util/compression.go
index 2881a7bfd..736f76a5e 100644
--- a/weed/util/compression.go
+++ b/weed/util/compression.go
@@ -12,15 +12,44 @@ import (
"github.com/klauspost/compress/zstd"
)
+var(
+ UnsupportedCompression = fmt.Errorf("unsupported compression")
+)
+
+func MaybeGzipData(input []byte) ([]byte) {
+ if IsGzippedContent(input) {
+ return input
+ }
+ gzipped, err := GzipData(input)
+ if err != nil {
+ return input
+ }
+ if len(gzipped) * 10 > len(input) * 9 {
+ return input
+ }
+ return gzipped
+}
+
+func MaybeDecompressData(input []byte) ([]byte) {
+ uncompressed, err := DecompressData(input)
+ if err != nil {
+ if err != UnsupportedCompression {
+ glog.Errorf("decompressed data: %v", err)
+ }
+ return input
+ }
+ return uncompressed
+}
+
func GzipData(input []byte) ([]byte, error) {
buf := new(bytes.Buffer)
w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed)
if _, err := w.Write(input); err != nil {
- glog.V(2).Infoln("error compressing data:", err)
+ glog.V(2).Infof("error gzip data: %v", err)
return nil, err
}
if err := w.Close(); err != nil {
- glog.V(2).Infoln("error closing compressed data:", err)
+ glog.V(2).Infof("error closing gzipped data: %v", err)
return nil, err
}
return buf.Bytes(), nil
@@ -39,7 +68,7 @@ func DecompressData(input []byte) ([]byte, error) {
if IsZstdContent(input) {
return unzstdData(input)
}
- return input, fmt.Errorf("unsupported compression")
+ return input, UnsupportedCompression
}
func ungzipData(input []byte) ([]byte, error) {
@@ -48,7 +77,7 @@ func ungzipData(input []byte) ([]byte, error) {
defer r.Close()
output, err := ioutil.ReadAll(r)
if err != nil {
- glog.V(2).Infoln("error uncompressing data:", err)
+ glog.V(2).Infof("error ungzip data: %v", err)
}
return output, err
}