aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/scaffold.go5
-rw-r--r--weed/filer/hbase/hbase_store.go227
-rw-r--r--weed/filer/hbase/hbase_store_kv.go75
-rw-r--r--weed/server/filer_server.go1
4 files changed, 308 insertions, 0 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 04a988027..6cfd46427 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -141,6 +141,11 @@ password=""
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
+[hbase]
+enabled = false
+zkquorum = ""
+table = "seaweedfs"
+
[redis2]
enabled = false
address = "localhost:6379"
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
new file mode 100644
index 000000000..6b0ad58b9
--- /dev/null
+++ b/weed/filer/hbase/hbase_store.go
@@ -0,0 +1,227 @@
+package hbase
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/tsuna/gohbase"
+ "github.com/tsuna/gohbase/hrpc"
+ "io"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &HbaseStore{})
+}
+
+type HbaseStore struct {
+ Client gohbase.Client
+ table []byte
+ cfKv string
+ cfMetaDir string
+ column string
+}
+
+func (store *HbaseStore) GetName() string {
+ return "hbase"
+}
+
+func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"zkquorum"),
+ configuration.GetString(prefix+"table"),
+ )
+}
+
+func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
+ store.Client = gohbase.NewClient(zkquorum)
+ store.table = []byte(table)
+ store.cfKv = "kv"
+ store.cfMetaDir = "meta"
+ store.column = "a"
+
+ // check table exists
+ key := "whatever"
+ headers := map[string][]string{store.cfMetaDir: nil}
+ get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
+ if err != nil {
+ return fmt.Errorf("NewGet returned an error: %v", err)
+ }
+ _, err = store.Client.Get(get)
+ if err != gohbase.TableNotFound {
+ return nil
+ }
+
+ // create table
+ adminClient := gohbase.NewAdminClient(zkquorum)
+ cFamilies := []string{store.cfKv, store.cfMetaDir}
+ cf := make(map[string]map[string]string, len(cFamilies))
+ for _, f := range cFamilies {
+ cf[f] = nil
+ }
+ ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
+ if err := adminClient.CreateTable(ct); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
+ return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
+}
+
+func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
+ value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
+ if err != nil {
+ if err == filer.ErrKvNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ return nil, err
+ }
+
+ entry = &filer.Entry{
+ FullPath: path,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+ return entry, nil
+}
+
+func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
+ return store.doDelete(ctx, store.cfMetaDir, []byte(path))
+}
+
+func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
+
+ family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
+ expectedPrefix := []byte(path.Child(""))
+ scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
+ if err != nil {
+ return err
+ }
+
+ scanner := store.Client.Scan(scan)
+ defer scanner.Close()
+ for {
+ res, err := scanner.Next()
+ if err != nil {
+ break
+ }
+ if len(res.Cells) == 0 {
+ continue
+ }
+ cell := res.Cells[0]
+
+ if !bytes.HasPrefix(cell.Row, expectedPrefix) {
+ break
+ }
+ fullpath := util.FullPath(cell.Row)
+ dir, _ := fullpath.DirAndName()
+ if dir != string(path) {
+ continue
+ }
+
+ err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
+ if err != nil {
+ break
+ }
+
+ }
+ return
+}
+
+func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+}
+
+func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) {
+ family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
+ expectedPrefix := []byte(dirPath.Child(prefix))
+ scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
+ if err != nil {
+ return nil, err
+ }
+
+ var entries []*filer.Entry
+ scanner := store.Client.Scan(scan)
+ defer scanner.Close()
+ for {
+ res, err := scanner.Next()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return entries, err
+ }
+ if len(res.Cells) == 0 {
+ continue
+ }
+ cell := res.Cells[0]
+
+ if !bytes.HasPrefix(cell.Row, expectedPrefix) {
+ break
+ }
+
+ fullpath := util.FullPath(cell.Row)
+ dir, fileName := fullpath.DirAndName()
+ if dir != string(dirPath) {
+ continue
+ }
+
+ value := cell.Value
+
+ if fileName == startFileName && !includeStartFile {
+ continue
+ }
+
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: fullpath,
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, nil
+}
+
+func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+
+func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *HbaseStore) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go
new file mode 100644
index 000000000..26bf763e2
--- /dev/null
+++ b/weed/filer/hbase/hbase_store_kv.go
@@ -0,0 +1,75 @@
+package hbase
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/tsuna/gohbase/hrpc"
+ "time"
+)
+
+const(
+ COLUMN_NAME = "a"
+)
+func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ return store.doPut(ctx, store.cfKv, key, value, 0)
+}
+
+func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ return store.doGet(ctx, store.cfKv, key)
+}
+
+func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ return store.doDelete(ctx, store.cfKv, key)
+}
+
+func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte, ttlSecond int32) (err error) {
+ if ttlSecond > 0 {
+ return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal), hrpc.TTL(time.Duration(ttlSecond)*time.Second))
+ }
+ return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal))
+}
+
+func (store *HbaseStore) doPutWithOptions(ctx context.Context, cf string, key, value []byte, options ...func(hrpc.Call) error) (err error) {
+ values := map[string]map[string][]byte{cf: map[string][]byte{}}
+ values[cf][COLUMN_NAME] = value
+ putRequest, err := hrpc.NewPut(ctx, store.table, key, values, options...)
+ if err != nil {
+ return err
+ }
+ _, err = store.Client.Put(putRequest)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (value []byte, err error) {
+ family := map[string][]string{cf: {COLUMN_NAME}}
+ getRequest, err := hrpc.NewGet(context.Background(), store.table, key, hrpc.Families(family))
+ if err != nil {
+ return nil, err
+ }
+ getResp, err := store.Client.Get(getRequest)
+ if err != nil {
+ return nil, err
+ }
+ if len(getResp.Cells) == 0 {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return getResp.Cells[0].Value, nil
+}
+
+func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) {
+ values := map[string]map[string][]byte{cf: map[string][]byte{}}
+ values[cf][COLUMN_NAME] = nil
+ deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values, hrpc.Durability(hrpc.AsyncWal))
+ if err != nil {
+ return err
+ }
+ _, err = store.Client.Delete(deleteRequest)
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 2991d14ab..2da129ab2 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -23,6 +23,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"