aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-12-24 12:10:35 -0800
committerChris Lu <chris.lu@gmail.com>2020-12-24 12:10:35 -0800
commit0e016bc7bd3d08d4c843388d6af55802ebec0dad (patch)
treebc79b9bec11b0c5b29dd98cf009fe95eb16170b7
parent75c6edba9e9f09ff102a059c46a55c16a8e9063e (diff)
downloadseaweedfs-0e016bc7bd3d08d4c843388d6af55802ebec0dad.tar.xz
seaweedfs-0e016bc7bd3d08d4c843388d6af55802ebec0dad.zip
hbase add ttl
-rw-r--r--weed/filer/hbase/hbase_store.go2
-rw-r--r--weed/filer/hbase/hbase_store_kv.go16
2 files changed, 13 insertions, 5 deletions
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
index 03aecfd2b..6b0ad58b9 100644
--- a/weed/filer/hbase/hbase_store.go
+++ b/weed/filer/hbase/hbase_store.go
@@ -79,7 +79,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er
value = util.MaybeGzipData(value)
}
- return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value)
+ return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
}
func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go
index 8f5794ac1..26bf763e2 100644
--- a/weed/filer/hbase/hbase_store_kv.go
+++ b/weed/filer/hbase/hbase_store_kv.go
@@ -4,13 +4,14 @@ import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/tsuna/gohbase/hrpc"
+ "time"
)
const(
COLUMN_NAME = "a"
)
func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- return store.doPut(ctx, store.cfKv, key, value)
+ return store.doPut(ctx, store.cfKv, key, value, 0)
}
func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
@@ -21,10 +22,17 @@ func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) {
return store.doDelete(ctx, store.cfKv, key)
}
-func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte) (err error) {
+func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte, ttlSecond int32) (err error) {
+ if ttlSecond > 0 {
+ return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal), hrpc.TTL(time.Duration(ttlSecond)*time.Second))
+ }
+ return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal))
+}
+
+func (store *HbaseStore) doPutWithOptions(ctx context.Context, cf string, key, value []byte, options ...func(hrpc.Call) error) (err error) {
values := map[string]map[string][]byte{cf: map[string][]byte{}}
values[cf][COLUMN_NAME] = value
- putRequest, err := hrpc.NewPut(ctx, store.table, key, values)
+ putRequest, err := hrpc.NewPut(ctx, store.table, key, values, options...)
if err != nil {
return err
}
@@ -55,7 +63,7 @@ func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (valu
func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) {
values := map[string]map[string][]byte{cf: map[string][]byte{}}
values[cf][COLUMN_NAME] = nil
- deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values)
+ deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values, hrpc.Durability(hrpc.AsyncWal))
if err != nil {
return err
}