diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-08-29 18:41:29 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-08-29 18:41:29 -0700 |
| commit | 001a472057f01b3ac2d3edb59b3d5fb0a141cddd (patch) | |
| tree | b9bac6f68ab44cb2d0fee5df0db69ba589ff69b0 /weed/remote_storage | |
| parent | 05d27741179b199608d0c5c5373bf3c8939c1697 (diff) | |
| download | seaweedfs-001a472057f01b3ac2d3edb59b3d5fb0a141cddd.tar.xz seaweedfs-001a472057f01b3ac2d3edb59b3d5fb0a141cddd.zip | |
cloud mount: remote storage support hdfs
Diffstat (limited to 'weed/remote_storage')
| -rw-r--r-- | weed/remote_storage/azure/azure_storage_client.go | 4 | ||||
| -rw-r--r-- | weed/remote_storage/gcs/gcs_storage_client.go | 4 | ||||
| -rw-r--r-- | weed/remote_storage/hdfs/hdfs_kerberos.go | 55 | ||||
| -rw-r--r-- | weed/remote_storage/hdfs/hdfs_storage_client.go | 174 | ||||
| -rw-r--r-- | weed/remote_storage/hdfs/traverse_bfs.go | 63 | ||||
| -rw-r--r-- | weed/remote_storage/remote_storage.go | 42 | ||||
| -rw-r--r-- | weed/remote_storage/s3/aliyun.go | 4 | ||||
| -rw-r--r-- | weed/remote_storage/s3/backblaze.go | 4 | ||||
| -rw-r--r-- | weed/remote_storage/s3/baidu.go | 4 | ||||
| -rw-r--r-- | weed/remote_storage/s3/s3_storage_client.go | 4 | ||||
| -rw-r--r-- | weed/remote_storage/s3/tencent.go | 4 | ||||
| -rw-r--r-- | weed/remote_storage/s3/wasabi.go | 4 |
12 files changed, 365 insertions, 1 deletions
diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go index c2e1416a5..21b8606c3 100644 --- a/weed/remote_storage/azure/azure_storage_client.go +++ b/weed/remote_storage/azure/azure_storage_client.go @@ -22,6 +22,10 @@ func init() { type azureRemoteStorageMaker struct{} +func (s azureRemoteStorageMaker) HasBucket() bool { + return true +} + func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &azureRemoteStorageClient{ diff --git a/weed/remote_storage/gcs/gcs_storage_client.go b/weed/remote_storage/gcs/gcs_storage_client.go index 44d41f4fd..828d62978 100644 --- a/weed/remote_storage/gcs/gcs_storage_client.go +++ b/weed/remote_storage/gcs/gcs_storage_client.go @@ -22,6 +22,10 @@ func init() { type gcsRemoteStorageMaker struct{} +func (s gcsRemoteStorageMaker) HasBucket() bool { + return true +} + func (s gcsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &gcsRemoteStorageClient{ conf: conf, diff --git a/weed/remote_storage/hdfs/hdfs_kerberos.go b/weed/remote_storage/hdfs/hdfs_kerberos.go new file mode 100644 index 000000000..50abc0ad5 --- /dev/null +++ b/weed/remote_storage/hdfs/hdfs_kerberos.go @@ -0,0 +1,55 @@ +package hdfs + +import ( + "fmt" + "os" + "os/user" + "strings" + + krb "github.com/jcmturner/gokrb5/v8/client" + "github.com/jcmturner/gokrb5/v8/config" + "github.com/jcmturner/gokrb5/v8/credentials" +) + +// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go +func getKerberosClient() (*krb.Client, error) { + configPath := os.Getenv("KRB5_CONFIG") + if configPath == "" { + configPath = "/etc/krb5.conf" + } + + cfg, err := config.Load(configPath) + if err != nil { + return nil, err + } + + // Determine the ccache location from the environment, falling back to the + // default location. + ccachePath := os.Getenv("KRB5CCNAME") + if strings.Contains(ccachePath, ":") { + if strings.HasPrefix(ccachePath, "FILE:") { + ccachePath = strings.SplitN(ccachePath, ":", 2)[1] + } else { + return nil, fmt.Errorf("unusable ccache: %s", ccachePath) + } + } else if ccachePath == "" { + u, err := user.Current() + if err != nil { + return nil, err + } + + ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid) + } + + ccache, err := credentials.LoadCCache(ccachePath) + if err != nil { + return nil, err + } + + client, err := krb.NewFromCCache(ccache, cfg) + if err != nil { + return nil, err + } + + return client, nil +} diff --git a/weed/remote_storage/hdfs/hdfs_storage_client.go b/weed/remote_storage/hdfs/hdfs_storage_client.go new file mode 100644 index 000000000..0c5f5a45d --- /dev/null +++ b/weed/remote_storage/hdfs/hdfs_storage_client.go @@ -0,0 +1,174 @@ +package hdfs + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/colinmarc/hdfs/v2" + "io" + "os" + "path" +) + +func init() { + remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker) +} + +type hdfsRemoteStorageMaker struct{} + +func (s hdfsRemoteStorageMaker) HasBucket() bool { + return false +} + +func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { + client := &hdfsRemoteStorageClient{ + conf: conf, + } + + options := hdfs.ClientOptions{ + Addresses: conf.HdfsNamenodes, + UseDatanodeHostname: false, + } + + if conf.HdfsServicePrincipalName != "" { + var err error + options.KerberosClient, err = getKerberosClient() + if err != nil { + return nil, fmt.Errorf("get kerberos authentication: %s", err) + } + options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName + + if conf.HdfsDataTransferProtection != "" { + options.DataTransferProtection = conf.HdfsDataTransferProtection + } + } else { + options.User = conf.HdfsUsername + } + + c, err := hdfs.NewClient(options) + if err != nil { + return nil, err + } + + client.client = c + return client, nil +} + +type hdfsRemoteStorageClient struct { + conf *remote_pb.RemoteConf + client *hdfs.Client +} + +var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{}) + +func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { + + return TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error { + children, err := c.client.ReadDir(string(parentDir)) + if err != nil { + return err + } + for _, child := range children { + if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{ + StorageName: c.conf.Name, + LastLocalSyncTsNs: 0, + RemoteETag: "", + RemoteMtime: child.ModTime().Unix(), + RemoteSize: child.Size(), + }); err != nil { + return nil + } + } + return nil + }, util.FullPath(loc.Path), visitFn) + +} +func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { + + f, err := c.client.Open(loc.Path) + if err != nil { + return + } + defer f.Close() + data = make([]byte, size) + _, err = f.ReadAt(data, offset) + + return + +} + +func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { + return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode)) +} + +func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { + + dirname := path.Dir(loc.Path) + + // ensure parent directory + if err = c.client.MkdirAll(dirname, 0755); err != nil { + return + } + + // remove existing file + info, err := c.client.Stat(loc.Path) + if err == nil { + err = c.client.Remove(loc.Path) + if err != nil { + return + } + } + + // create new file + out, err := c.client.Create(loc.Path) + if err != nil { + return + } + + cleanup := func() { + if removeErr := c.client.Remove(loc.Path); removeErr != nil { + glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr) + } + } + + if _, err = io.Copy(out, reader); err != nil { + cleanup() + return + } + + if err = out.Close(); err != nil { + cleanup() + return + } + + info, err = c.client.Stat(loc.Path) + if err != nil { + return + } + + return &filer_pb.RemoteEntry{ + RemoteMtime: info.ModTime().Unix(), + RemoteSize: info.Size(), + RemoteETag: "", + StorageName: c.conf.Name, + }, nil + +} + +func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { + if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode { + if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil { + return err + } + } + return nil +} +func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) { + if err = c.client.Remove(loc.Path); err != nil { + return fmt.Errorf("hdfs delete %s: %v", loc.Path, err) + } + return +} diff --git a/weed/remote_storage/hdfs/traverse_bfs.go b/weed/remote_storage/hdfs/traverse_bfs.go new file mode 100644 index 000000000..755771283 --- /dev/null +++ b/weed/remote_storage/hdfs/traverse_bfs.go @@ -0,0 +1,63 @@ +package hdfs + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "sync" + "time" +) + +type ListDirectoryFunc func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error + +func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc) (err error) { + K := 5 + + var dirQueueWg sync.WaitGroup + dirQueue := util.NewQueue() + dirQueueWg.Add(1) + dirQueue.Enqueue(parentPath) + var isTerminating bool + + for i := 0; i < K; i++ { + go func() { + for { + if isTerminating { + break + } + t := dirQueue.Dequeue() + if t == nil { + time.Sleep(329 * time.Millisecond) + continue + } + dir := t.(util.FullPath) + processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg) + if processErr != nil { + err = processErr + } + dirQueueWg.Done() + } + }() + } + + dirQueueWg.Wait() + isTerminating = true + return + +} + +func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) (error) { + + return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { + if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil { + return err + } + if !isDirectory { + return nil + } + dirQueueWg.Add(1) + dirQueue.Enqueue(parentPath.Child(name)) + return nil + }) + +} diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go index c9bef8c9b..a4caef0d4 100644 --- a/weed/remote_storage/remote_storage.go +++ b/weed/remote_storage/remote_storage.go @@ -9,7 +9,18 @@ import ( "sync" ) -func ParseLocation(remote string) (loc *remote_pb.RemoteStorageLocation) { +func ParseLocationName(remote string) (locationName string) { + if strings.HasSuffix(string(remote), "/") { + remote = remote[:len(remote)-1] + } + parts := strings.SplitN(string(remote), "/", 2) + if len(parts) >= 1 { + return parts[0] + } + return +} + +func parseBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) { loc = &remote_pb.RemoteStorageLocation{} if strings.HasSuffix(string(remote), "/") { remote = remote[:len(remote)-1] @@ -28,6 +39,22 @@ func ParseLocation(remote string) (loc *remote_pb.RemoteStorageLocation) { return } +func parseNoBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) { + loc = &remote_pb.RemoteStorageLocation{} + if strings.HasSuffix(string(remote), "/") { + remote = remote[:len(remote)-1] + } + parts := strings.SplitN(string(remote), "/", 2) + if len(parts) >= 1 { + loc.Name = parts[0] + } + loc.Path = string(remote[len(loc.Name):]) + if loc.Path == "" { + loc.Path = "/" + } + return +} + func FormatLocation(loc *remote_pb.RemoteStorageLocation) string { return fmt.Sprintf("%s/%s%s", loc.Name, loc.Bucket, loc.Path) } @@ -45,6 +72,7 @@ type RemoteStorageClient interface { type RemoteStorageClientMaker interface { Make(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error) + HasBucket() bool } var ( @@ -53,6 +81,18 @@ var ( remoteStorageClientsLock sync.Mutex ) +func ParseRemoteLocation(remoteConfType string, remote string) (remoteStorageLocation *remote_pb.RemoteStorageLocation, err error) { + maker, found := RemoteStorageClientMakers[remoteConfType] + if !found { + return nil, fmt.Errorf("remote storage type %s not found", remoteConfType) + } + + if !maker.HasBucket() { + return parseNoBucketLocation(remote), nil + } + return parseBucketLocation(remote), nil +} + func makeRemoteStorageClient(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error) { maker, found := RemoteStorageClientMakers[remoteConf.Type] if !found { diff --git a/weed/remote_storage/s3/aliyun.go b/weed/remote_storage/s3/aliyun.go index 3a681369a..567c74299 100644 --- a/weed/remote_storage/s3/aliyun.go +++ b/weed/remote_storage/s3/aliyun.go @@ -18,6 +18,10 @@ func init() { type AliyunRemoteStorageMaker struct{} +func (s AliyunRemoteStorageMaker) HasBucket() bool { + return true +} + func (s AliyunRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &s3RemoteStorageClient{ conf: conf, diff --git a/weed/remote_storage/s3/backblaze.go b/weed/remote_storage/s3/backblaze.go index e4833999e..914f0ca44 100644 --- a/weed/remote_storage/s3/backblaze.go +++ b/weed/remote_storage/s3/backblaze.go @@ -16,6 +16,10 @@ func init() { type BackBlazeRemoteStorageMaker struct{} +func (s BackBlazeRemoteStorageMaker) HasBucket() bool { + return true +} + func (s BackBlazeRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &s3RemoteStorageClient{ conf: conf, diff --git a/weed/remote_storage/s3/baidu.go b/weed/remote_storage/s3/baidu.go index 8c5bf7d1b..dfcf32512 100644 --- a/weed/remote_storage/s3/baidu.go +++ b/weed/remote_storage/s3/baidu.go @@ -18,6 +18,10 @@ func init() { type BaiduRemoteStorageMaker struct{} +func (s BaiduRemoteStorageMaker) HasBucket() bool { + return true +} + func (s BaiduRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &s3RemoteStorageClient{ conf: conf, diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go index 5fadcbc3b..a210683aa 100644 --- a/weed/remote_storage/s3/s3_storage_client.go +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -23,6 +23,10 @@ func init() { type s3RemoteStorageMaker struct{} +func (s s3RemoteStorageMaker) HasBucket() bool { + return true +} + func (s s3RemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &s3RemoteStorageClient{ conf: conf, diff --git a/weed/remote_storage/s3/tencent.go b/weed/remote_storage/s3/tencent.go index e2591ca8c..9df72a7e2 100644 --- a/weed/remote_storage/s3/tencent.go +++ b/weed/remote_storage/s3/tencent.go @@ -18,6 +18,10 @@ func init() { type TencentRemoteStorageMaker struct{} +func (s TencentRemoteStorageMaker) HasBucket() bool { + return true +} + func (s TencentRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &s3RemoteStorageClient{ conf: conf, diff --git a/weed/remote_storage/s3/wasabi.go b/weed/remote_storage/s3/wasabi.go index 6f8fc3ca8..29cdf7395 100644 --- a/weed/remote_storage/s3/wasabi.go +++ b/weed/remote_storage/s3/wasabi.go @@ -18,6 +18,10 @@ func init() { type WasabiRemoteStorageMaker struct{} +func (s WasabiRemoteStorageMaker) HasBucket() bool { + return true +} + func (s WasabiRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { client := &s3RemoteStorageClient{ conf: conf, |
