aboutsummaryrefslogtreecommitdiff
path: root/weed/remote_storage/hdfs/hdfs_storage_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/remote_storage/hdfs/hdfs_storage_client.go')
-rw-r--r--weed/remote_storage/hdfs/hdfs_storage_client.go174
1 files changed, 174 insertions, 0 deletions
diff --git a/weed/remote_storage/hdfs/hdfs_storage_client.go b/weed/remote_storage/hdfs/hdfs_storage_client.go
new file mode 100644
index 000000000..0c5f5a45d
--- /dev/null
+++ b/weed/remote_storage/hdfs/hdfs_storage_client.go
@@ -0,0 +1,174 @@
+package hdfs
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/colinmarc/hdfs/v2"
+ "io"
+ "os"
+ "path"
+)
+
+func init() {
+ remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker)
+}
+
+type hdfsRemoteStorageMaker struct{}
+
+func (s hdfsRemoteStorageMaker) HasBucket() bool {
+ return false
+}
+
+func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
+ client := &hdfsRemoteStorageClient{
+ conf: conf,
+ }
+
+ options := hdfs.ClientOptions{
+ Addresses: conf.HdfsNamenodes,
+ UseDatanodeHostname: false,
+ }
+
+ if conf.HdfsServicePrincipalName != "" {
+ var err error
+ options.KerberosClient, err = getKerberosClient()
+ if err != nil {
+ return nil, fmt.Errorf("get kerberos authentication: %s", err)
+ }
+ options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName
+
+ if conf.HdfsDataTransferProtection != "" {
+ options.DataTransferProtection = conf.HdfsDataTransferProtection
+ }
+ } else {
+ options.User = conf.HdfsUsername
+ }
+
+ c, err := hdfs.NewClient(options)
+ if err != nil {
+ return nil, err
+ }
+
+ client.client = c
+ return client, nil
+}
+
+type hdfsRemoteStorageClient struct {
+ conf *remote_pb.RemoteConf
+ client *hdfs.Client
+}
+
+var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{})
+
+func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
+
+ return TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
+ children, err := c.client.ReadDir(string(parentDir))
+ if err != nil {
+ return err
+ }
+ for _, child := range children {
+ if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{
+ StorageName: c.conf.Name,
+ LastLocalSyncTsNs: 0,
+ RemoteETag: "",
+ RemoteMtime: child.ModTime().Unix(),
+ RemoteSize: child.Size(),
+ }); err != nil {
+ return nil
+ }
+ }
+ return nil
+ }, util.FullPath(loc.Path), visitFn)
+
+}
+func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
+
+ f, err := c.client.Open(loc.Path)
+ if err != nil {
+ return
+ }
+ defer f.Close()
+ data = make([]byte, size)
+ _, err = f.ReadAt(data, offset)
+
+ return
+
+}
+
+func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
+ return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode))
+}
+
+func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
+
+ dirname := path.Dir(loc.Path)
+
+ // ensure parent directory
+ if err = c.client.MkdirAll(dirname, 0755); err != nil {
+ return
+ }
+
+ // remove existing file
+ info, err := c.client.Stat(loc.Path)
+ if err == nil {
+ err = c.client.Remove(loc.Path)
+ if err != nil {
+ return
+ }
+ }
+
+ // create new file
+ out, err := c.client.Create(loc.Path)
+ if err != nil {
+ return
+ }
+
+ cleanup := func() {
+ if removeErr := c.client.Remove(loc.Path); removeErr != nil {
+ glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr)
+ }
+ }
+
+ if _, err = io.Copy(out, reader); err != nil {
+ cleanup()
+ return
+ }
+
+ if err = out.Close(); err != nil {
+ cleanup()
+ return
+ }
+
+ info, err = c.client.Stat(loc.Path)
+ if err != nil {
+ return
+ }
+
+ return &filer_pb.RemoteEntry{
+ RemoteMtime: info.ModTime().Unix(),
+ RemoteSize: info.Size(),
+ RemoteETag: "",
+ StorageName: c.conf.Name,
+ }, nil
+
+}
+
+func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
+ if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode {
+ if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
+ if err = c.client.Remove(loc.Path); err != nil {
+ return fmt.Errorf("hdfs delete %s: %v", loc.Path, err)
+ }
+ return
+}