diff options
Diffstat (limited to 'weed/remote_storage/hdfs/hdfs_storage_client.go')
| -rw-r--r-- | weed/remote_storage/hdfs/hdfs_storage_client.go | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/weed/remote_storage/hdfs/hdfs_storage_client.go b/weed/remote_storage/hdfs/hdfs_storage_client.go new file mode 100644 index 000000000..0c5f5a45d --- /dev/null +++ b/weed/remote_storage/hdfs/hdfs_storage_client.go @@ -0,0 +1,174 @@ +package hdfs + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/colinmarc/hdfs/v2" + "io" + "os" + "path" +) + +func init() { + remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker) +} + +type hdfsRemoteStorageMaker struct{} + +func (s hdfsRemoteStorageMaker) HasBucket() bool { + return false +} + +func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { + client := &hdfsRemoteStorageClient{ + conf: conf, + } + + options := hdfs.ClientOptions{ + Addresses: conf.HdfsNamenodes, + UseDatanodeHostname: false, + } + + if conf.HdfsServicePrincipalName != "" { + var err error + options.KerberosClient, err = getKerberosClient() + if err != nil { + return nil, fmt.Errorf("get kerberos authentication: %s", err) + } + options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName + + if conf.HdfsDataTransferProtection != "" { + options.DataTransferProtection = conf.HdfsDataTransferProtection + } + } else { + options.User = conf.HdfsUsername + } + + c, err := hdfs.NewClient(options) + if err != nil { + return nil, err + } + + client.client = c + return client, nil +} + +type hdfsRemoteStorageClient struct { + conf *remote_pb.RemoteConf + client *hdfs.Client +} + +var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{}) + +func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { + + return TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error { + children, err := c.client.ReadDir(string(parentDir)) + if err != nil { + return err + } + for _, child := range children { + if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{ + StorageName: c.conf.Name, + LastLocalSyncTsNs: 0, + RemoteETag: "", + RemoteMtime: child.ModTime().Unix(), + RemoteSize: child.Size(), + }); err != nil { + return nil + } + } + return nil + }, util.FullPath(loc.Path), visitFn) + +} +func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { + + f, err := c.client.Open(loc.Path) + if err != nil { + return + } + defer f.Close() + data = make([]byte, size) + _, err = f.ReadAt(data, offset) + + return + +} + +func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { + return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode)) +} + +func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { + + dirname := path.Dir(loc.Path) + + // ensure parent directory + if err = c.client.MkdirAll(dirname, 0755); err != nil { + return + } + + // remove existing file + info, err := c.client.Stat(loc.Path) + if err == nil { + err = c.client.Remove(loc.Path) + if err != nil { + return + } + } + + // create new file + out, err := c.client.Create(loc.Path) + if err != nil { + return + } + + cleanup := func() { + if removeErr := c.client.Remove(loc.Path); removeErr != nil { + glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr) + } + } + + if _, err = io.Copy(out, reader); err != nil { + cleanup() + return + } + + if err = out.Close(); err != nil { + cleanup() + return + } + + info, err = c.client.Stat(loc.Path) + if err != nil { + return + } + + return &filer_pb.RemoteEntry{ + RemoteMtime: info.ModTime().Unix(), + RemoteSize: info.Size(), + RemoteETag: "", + StorageName: c.conf.Name, + }, nil + +} + +func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { + if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode { + if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil { + return err + } + } + return nil +} +func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) { + if err = c.client.Remove(loc.Path); err != nil { + return fmt.Errorf("hdfs delete %s: %v", loc.Path, err) + } + return +} |
