diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/scaffold.go | 13 | ||||
| -rw-r--r-- | weed/filer2/etcd/etcd_store.go | 186 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 1 |
3 files changed, 196 insertions, 4 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 062fe0ff8..67e0ae7c9 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -145,6 +145,11 @@ addresses = [ ] password = "" +[etcd] +enabled = false +servers = "localhost:2379" +timeout = "3s" + ` NOTIFICATION_TOML_EXAMPLE = ` @@ -217,22 +222,22 @@ grpcAddress = "localhost:18888" # all files under this directory tree are replicated. # this is not a directory on your hard drive, but on your filer. # i.e., all files with this "prefix" are sent to notification message queue. -directory = "/buckets" +directory = "/buckets" [sink.filer] enabled = false grpcAddress = "localhost:18888" # all replicated files are under this directory tree -# this is not a directory on your hard drive, but on your filer. +# this is not a directory on your hard drive, but on your filer. # i.e., all received files will be "prefixed" to this directory. -directory = "/backup" +directory = "/backup" replication = "" collection = "" ttlSec = 0 [sink.s3] # read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html -# default loads credentials from the shared credentials file (~/.aws/credentials). +# default loads credentials from the shared credentials file (~/.aws/credentials). enabled = false aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go new file mode 100644 index 000000000..1b0f928d0 --- /dev/null +++ b/weed/filer2/etcd/etcd_store.go @@ -0,0 +1,186 @@ +package etcd + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + weed_util "github.com/chrislusf/seaweedfs/weed/util" + "go.etcd.io/etcd/clientv3" +) + +const ( + DIR_FILE_SEPARATOR = byte(0x00) +) + +func init() { + filer2.Stores = append(filer2.Stores, &EtcdStore{}) +} + +type EtcdStore struct { + client *clientv3.Client +} + +func (store *EtcdStore) GetName() string { + return "etcd" +} + +func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) { + servers := configuration.GetString("servers") + if servers == "" { + servers = "localhost:2379" + } + + timeout := configuration.GetString("timeout") + if timeout == "" { + timeout = "3s" + } + + return store.initialize(servers, timeout) +} + +func (store *EtcdStore) initialize(servers string, timeout string) (err error) { + glog.Infof("filer store etcd: %s", servers) + + to, err := time.ParseDuration(timeout) + if err != nil { + return fmt.Errorf("parse timeout %s: %s", timeout, err) + } + + store.client, err = clientv3.New(clientv3.Config{ + Endpoints: strings.Split(servers, ","), + DialTimeout: to, + }) + if err != nil { + return fmt.Errorf("connect to etcd %s: %s", servers, err) + } + + return +} + +func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *EtcdStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *EtcdStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { + key := genKey(entry.DirAndName()) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if _, err := store.client.Put(ctx, string(key), string(value)); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + return nil +} + +func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { + key := genKey(fullpath.DirAndName()) + + resp, err := store.client.Get(ctx, string(key)) + if err != nil { + return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + } + + if len(resp.Kvs) == 0 { + return nil, filer2.ErrNotFound + } + + entry = &filer2.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { + key := genKey(fullpath.DirAndName()) + + if _, err := store.client.Delete(ctx, string(key)); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *EtcdStore) ListDirectoryEntries( + ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int, +) (entries []*filer2.Entry, err error) { + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + resp, err := store.client.Get(ctx, string(directoryPrefix), + clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + if err != nil { + return nil, fmt.Errorf("list %s : %v", fullpath, err) + } + + for _, kv := range resp.Kvs { + fileName := getNameFromKey(kv.Key) + if fileName == "" { + continue + } + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entry := &filer2.Entry{ + FullPath: filer2.NewFullPath(string(fullpath), fileName), + } + if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + + return entries, err +} + +func genKey(dirPath, fileName string) (key []byte) { + key = []byte(dirPath) + key = append(key, DIR_FILE_SEPARATOR) + key = append(key, []byte(fileName)...) + return key +} + +func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = []byte(string(fullpath)) + keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func getNameFromKey(key []byte) string { + sepIndex := len(key) - 1 + for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR { + sepIndex-- + } + + return string(key[sepIndex+1:]) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b9e6aa23d..f2dec9346 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -15,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" |
