aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/scaffold.go13
-rw-r--r--weed/filer2/etcd/etcd_store.go186
-rw-r--r--weed/server/filer_server.go1
3 files changed, 196 insertions, 4 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 062fe0ff8..67e0ae7c9 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -145,6 +145,11 @@ addresses = [
]
password = ""
+[etcd]
+enabled = false
+servers = "localhost:2379"
+timeout = "3s"
+
`
NOTIFICATION_TOML_EXAMPLE = `
@@ -217,22 +222,22 @@ grpcAddress = "localhost:18888"
# all files under this directory tree are replicated.
# this is not a directory on your hard drive, but on your filer.
# i.e., all files with this "prefix" are sent to notification message queue.
-directory = "/buckets"
+directory = "/buckets"
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
# all replicated files are under this directory tree
-# this is not a directory on your hard drive, but on your filer.
+# this is not a directory on your hard drive, but on your filer.
# i.e., all received files will be "prefixed" to this directory.
-directory = "/backup"
+directory = "/backup"
replication = ""
collection = ""
ttlSec = 0
[sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
-# default loads credentials from the shared credentials file (~/.aws/credentials).
+# default loads credentials from the shared credentials file (~/.aws/credentials).
enabled = false
aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
new file mode 100644
index 000000000..1b0f928d0
--- /dev/null
+++ b/weed/filer2/etcd/etcd_store.go
@@ -0,0 +1,186 @@
+package etcd
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+ "go.etcd.io/etcd/clientv3"
+)
+
+const (
+ DIR_FILE_SEPARATOR = byte(0x00)
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &EtcdStore{})
+}
+
+type EtcdStore struct {
+ client *clientv3.Client
+}
+
+func (store *EtcdStore) GetName() string {
+ return "etcd"
+}
+
+func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) {
+ servers := configuration.GetString("servers")
+ if servers == "" {
+ servers = "localhost:2379"
+ }
+
+ timeout := configuration.GetString("timeout")
+ if timeout == "" {
+ timeout = "3s"
+ }
+
+ return store.initialize(servers, timeout)
+}
+
+func (store *EtcdStore) initialize(servers string, timeout string) (err error) {
+ glog.Infof("filer store etcd: %s", servers)
+
+ to, err := time.ParseDuration(timeout)
+ if err != nil {
+ return fmt.Errorf("parse timeout %s: %s", timeout, err)
+ }
+
+ store.client, err = clientv3.New(clientv3.Config{
+ Endpoints: strings.Split(servers, ","),
+ DialTimeout: to,
+ })
+ if err != nil {
+ return fmt.Errorf("connect to etcd %s: %s", servers, err)
+ }
+
+ return
+}
+
+func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *EtcdStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ key := genKey(entry.DirAndName())
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+ key := genKey(fullpath.DirAndName())
+
+ resp, err := store.client.Get(ctx, string(key))
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ if len(resp.Kvs) == 0 {
+ return nil, filer2.ErrNotFound
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ key := genKey(fullpath.DirAndName())
+
+ if _, err := store.client.Delete(ctx, string(key)); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *EtcdStore) ListDirectoryEntries(
+ ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
+) (entries []*filer2.Entry, err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ resp, err := store.client.Get(ctx, string(directoryPrefix),
+ clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
+ if err != nil {
+ return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ }
+
+ for _, kv := range resp.Kvs {
+ fileName := getNameFromKey(kv.Key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer2.Entry{
+ FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = []byte(dirPath)
+ key = append(key, DIR_FILE_SEPARATOR)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = []byte(string(fullpath))
+ keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+ sepIndex := len(key) - 1
+ for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
+ sepIndex--
+ }
+
+ return string(key[sepIndex+1:])
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index b9e6aa23d..f2dec9346 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -15,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"