aboutsummaryrefslogtreecommitdiff
path: root/weed/remote_storage
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-08-24 01:18:30 -0700
committerChris Lu <chris.lu@gmail.com>2021-08-24 01:18:30 -0700
commite9ebe24f2e0f8b30284ea9334d5924f54e970c95 (patch)
treec160f0e8d5bd74231fc7f81563f13bcd3a756128 /weed/remote_storage
parent7c39a18ba5985cfcd40fbccea0945874b297d9f2 (diff)
downloadseaweedfs-e9ebe24f2e0f8b30284ea9334d5924f54e970c95.tar.xz
seaweedfs-e9ebe24f2e0f8b30284ea9334d5924f54e970c95.zip
cloud drive: add support for Azure
Diffstat (limited to 'weed/remote_storage')
-rw-r--r--weed/remote_storage/azure/azure_highlevel.go120
-rw-r--r--weed/remote_storage/azure/azure_storage_client.go207
2 files changed, 327 insertions, 0 deletions
diff --git a/weed/remote_storage/azure/azure_highlevel.go b/weed/remote_storage/azure/azure_highlevel.go
new file mode 100644
index 000000000..9b735c2cb
--- /dev/null
+++ b/weed/remote_storage/azure/azure_highlevel.go
@@ -0,0 +1,120 @@
+package azure
+
+import (
+ "context"
+ "crypto/rand"
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "github.com/Azure/azure-pipeline-go/pipeline"
+ . "github.com/Azure/azure-storage-blob-go/azblob"
+ "io"
+ "sync"
+)
+
+// copied from https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/highlevel.go#L73:6
+// uploadReaderAtToBlockBlob was not public
+
+// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
+func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64,
+ blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
+ if o.BlockSize == 0 {
+ // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
+ if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
+ return nil, errors.New("buffer is too large to upload to a block blob")
+ }
+ // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
+ if readerSize <= BlockBlobMaxUploadBlobBytes {
+ o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
+ } else {
+ o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
+ if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
+ o.BlockSize = BlobDefaultDownloadBlockSize
+ }
+ // StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
+ }
+ }
+
+ if readerSize <= BlockBlobMaxUploadBlobBytes {
+ // If the size can fit in 1 Upload call, do it this way
+ var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
+ if o.Progress != nil {
+ body = pipeline.NewRequestBodyProgress(body, o.Progress)
+ }
+ return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
+ }
+
+ var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)
+
+ blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
+ progress := int64(0)
+ progressLock := &sync.Mutex{}
+
+ err := DoBatchTransfer(ctx, BatchTransferOptions{
+ OperationName: "uploadReaderAtToBlockBlob",
+ TransferSize: readerSize,
+ ChunkSize: o.BlockSize,
+ Parallelism: o.Parallelism,
+ Operation: func(offset int64, count int64, ctx context.Context) error {
+ // This function is called once per block.
+ // It is passed this block's offset within the buffer and its count of bytes
+ // Prepare to read the proper block/section of the buffer
+ var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
+ blockNum := offset / o.BlockSize
+ if o.Progress != nil {
+ blockProgress := int64(0)
+ body = pipeline.NewRequestBodyProgress(body,
+ func(bytesTransferred int64) {
+ diff := bytesTransferred - blockProgress
+ blockProgress = bytesTransferred
+ progressLock.Lock() // 1 goroutine at a time gets a progress report
+ progress += diff
+ o.Progress(progress)
+ progressLock.Unlock()
+ })
+ }
+
+ // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
+ // at the same time causing PutBlockList to get a mix of blocks from all the clients.
+ blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
+ _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions)
+ return err
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ // All put blocks were successful, call Put Block List to finalize the blob
+ return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
+}
+
+// The UUID reserved variants.
+const (
+ reservedNCS byte = 0x80
+ reservedRFC4122 byte = 0x40
+ reservedMicrosoft byte = 0x20
+ reservedFuture byte = 0x00
+)
+
+type uuid [16]byte
+
+// NewUUID returns a new uuid using RFC 4122 algorithm.
+func newUUID() (u uuid) {
+ u = uuid{}
+ // Set all bits to randomly (or pseudo-randomly) chosen values.
+ rand.Read(u[:])
+ u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122)
+
+ var version byte = 4
+ u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4)
+ return
+}
+
+// String returns an unparsed version of the generated UUID sequence.
+func (u uuid) String() string {
+ return fmt.Sprintf("%x-%x-%x-%x-%x", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
+}
+
+func (u uuid) bytes() []byte {
+ return u[:]
+}
diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go
new file mode 100644
index 000000000..364eb07ab
--- /dev/null
+++ b/weed/remote_storage/azure/azure_storage_client.go
@@ -0,0 +1,207 @@
+package azure
+
+import (
+ "context"
+ "fmt"
+ "github.com/Azure/azure-storage-blob-go/azblob"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "io/ioutil"
+ "net/url"
+ "os"
+ "reflect"
+)
+
+func init() {
+ remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker)
+}
+
+type azureRemoteStorageMaker struct{}
+
+func (s azureRemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
+
+ client := &azureRemoteStorageClient{
+ conf: conf,
+ }
+
+ accountName, accountKey := conf.AzureAccountName, conf.AzureAccountKey
+ if len(accountName) == 0 || len(accountKey) == 0 {
+ accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")
+ if len(accountName) == 0 || len(accountKey) == 0 {
+ return nil, fmt.Errorf("either AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
+ }
+ }
+
+ // Use your Storage account's name and key to create a credential object.
+ credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
+ if err != nil {
+ return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err)
+ }
+
+ // Create a request pipeline that is used to process HTTP(S) requests and responses.
+ p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
+
+ // Create an ServiceURL object that wraps the service URL and a request pipeline.
+ u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
+ client.serviceURL = azblob.NewServiceURL(*u, p)
+
+ return client, nil
+}
+
+type azureRemoteStorageClient struct {
+ conf *filer_pb.RemoteConf
+ serviceURL azblob.ServiceURL
+}
+
+var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
+
+func (az *azureRemoteStorageClient) Traverse(loc *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
+
+ pathKey := loc.Path[1:]
+ containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
+
+ // List the container that we have created above
+ for marker := (azblob.Marker{}); marker.NotDone(); {
+ // Get a result segment starting with the blob indicated by the current Marker.
+ listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{
+ Prefix: pathKey,
+ })
+ if err != nil {
+ return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err)
+ }
+
+ // ListBlobs returns the start of the next segment; you MUST use this to get
+ // the next segment (after processing the current result segment).
+ marker = listBlob.NextMarker
+
+ // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
+ for _, blobInfo := range listBlob.Segment.BlobItems {
+ key := blobInfo.Name
+ key = "/" + key
+ dir, name := util.FullPath(key).DirAndName()
+ err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
+ RemoteMtime: blobInfo.Properties.LastModified.Unix(),
+ RemoteSize: *blobInfo.Properties.ContentLength,
+ RemoteETag: string(blobInfo.Properties.Etag),
+ StorageName: az.conf.Name,
+ })
+ if err != nil {
+ return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err)
+ }
+ }
+ }
+
+ return
+}
+func (az *azureRemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
+
+ key := loc.Path[1:]
+ containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
+ blobURL := containerURL.NewBlockBlobURL(key)
+
+ downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
+ if readErr != nil {
+ return nil, readErr
+ }
+ // NOTE: automatically retries are performed if the connection fails
+ bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
+ defer bodyStream.Close()
+
+ data, err = ioutil.ReadAll(bodyStream)
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
+ }
+
+ return
+}
+
+func (az *azureRemoteStorageClient) WriteDirectory(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
+ return nil
+}
+
+func (az *azureRemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
+
+ key := loc.Path[1:]
+ containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
+ blobURL := containerURL.NewBlockBlobURL(key)
+
+ readerAt, ok := reader.(io.ReaderAt)
+ if !ok {
+ return nil, fmt.Errorf("unexpected reader: readerAt expected")
+ }
+ fileSize := int64(filer.FileSize(entry))
+
+ _, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{
+ BlockSize: 4 * 1024 * 1024,
+ Parallelism: 16})
+ if err != nil {
+ return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err)
+ }
+
+ metadata := toMetadata(entry.Extended)
+ if len(metadata) > 0 {
+ _, err = blobURL.SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("azure set metadata on %s%s: %v", loc.Bucket, loc.Path, err)
+ }
+ }
+
+ // read back the remote entry
+ return az.readFileRemoteEntry(loc)
+
+}
+
+func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *filer_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
+ key := loc.Path[1:]
+ containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
+ blobURL := containerURL.NewBlockBlobURL(key)
+
+ attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &filer_pb.RemoteEntry{
+ RemoteMtime: attr.LastModified().Unix(),
+ RemoteSize: attr.ContentLength(),
+ RemoteETag: string(attr.ETag()),
+ StorageName: az.conf.Name,
+ }, nil
+
+}
+
+func toMetadata(attributes map[string][]byte) map[string]string {
+ metadata := make(map[string]string)
+ for k, v := range attributes {
+ metadata[k] = string(v)
+ }
+ return metadata
+}
+
+func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
+ if reflect.DeepEqual(oldEntry.Extended, newEntry.Extended) {
+ return nil
+ }
+ metadata := toMetadata(newEntry.Extended)
+
+ key := loc.Path[1:]
+ containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
+
+ _, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
+
+ return
+}
+func (az *azureRemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) {
+ key := loc.Path[1:]
+ containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
+ if _, err = containerURL.NewBlobURL(key).Delete(context.Background(),
+ azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
+ return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err)
+ }
+ return
+}