diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-08-24 01:18:30 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-08-24 01:18:30 -0700 |
| commit | e9ebe24f2e0f8b30284ea9334d5924f54e970c95 (patch) | |
| tree | c160f0e8d5bd74231fc7f81563f13bcd3a756128 /weed/remote_storage | |
| parent | 7c39a18ba5985cfcd40fbccea0945874b297d9f2 (diff) | |
| download | seaweedfs-e9ebe24f2e0f8b30284ea9334d5924f54e970c95.tar.xz seaweedfs-e9ebe24f2e0f8b30284ea9334d5924f54e970c95.zip | |
cloud drive: add support for Azure
Diffstat (limited to 'weed/remote_storage')
| -rw-r--r-- | weed/remote_storage/azure/azure_highlevel.go | 120 | ||||
| -rw-r--r-- | weed/remote_storage/azure/azure_storage_client.go | 207 |
2 files changed, 327 insertions, 0 deletions
diff --git a/weed/remote_storage/azure/azure_highlevel.go b/weed/remote_storage/azure/azure_highlevel.go new file mode 100644 index 000000000..9b735c2cb --- /dev/null +++ b/weed/remote_storage/azure/azure_highlevel.go @@ -0,0 +1,120 @@ +package azure + +import ( + "context" + "crypto/rand" + "encoding/base64" + "errors" + "fmt" + "github.com/Azure/azure-pipeline-go/pipeline" + . "github.com/Azure/azure-storage-blob-go/azblob" + "io" + "sync" +) + +// copied from https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/highlevel.go#L73:6 +// uploadReaderAtToBlockBlob was not public + +// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob. +func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64, + blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { + if o.BlockSize == 0 { + // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error + if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks { + return nil, errors.New("buffer is too large to upload to a block blob") + } + // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request + if readerSize <= BlockBlobMaxUploadBlobBytes { + o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified + } else { + o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks + if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB + o.BlockSize = BlobDefaultDownloadBlockSize + } + // StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize). + } + } + + if readerSize <= BlockBlobMaxUploadBlobBytes { + // If the size can fit in 1 Upload call, do it this way + var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize) + if o.Progress != nil { + body = pipeline.NewRequestBodyProgress(body, o.Progress) + } + return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions) + } + + var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1) + + blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs + progress := int64(0) + progressLock := &sync.Mutex{} + + err := DoBatchTransfer(ctx, BatchTransferOptions{ + OperationName: "uploadReaderAtToBlockBlob", + TransferSize: readerSize, + ChunkSize: o.BlockSize, + Parallelism: o.Parallelism, + Operation: func(offset int64, count int64, ctx context.Context) error { + // This function is called once per block. + // It is passed this block's offset within the buffer and its count of bytes + // Prepare to read the proper block/section of the buffer + var body io.ReadSeeker = io.NewSectionReader(reader, offset, count) + blockNum := offset / o.BlockSize + if o.Progress != nil { + blockProgress := int64(0) + body = pipeline.NewRequestBodyProgress(body, + func(bytesTransferred int64) { + diff := bytesTransferred - blockProgress + blockProgress = bytesTransferred + progressLock.Lock() // 1 goroutine at a time gets a progress report + progress += diff + o.Progress(progress) + progressLock.Unlock() + }) + } + + // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks + // at the same time causing PutBlockList to get a mix of blocks from all the clients. + blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes()) + _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions) + return err + }, + }) + if err != nil { + return nil, err + } + // All put blocks were successful, call Put Block List to finalize the blob + return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions) +} + +// The UUID reserved variants. +const ( + reservedNCS byte = 0x80 + reservedRFC4122 byte = 0x40 + reservedMicrosoft byte = 0x20 + reservedFuture byte = 0x00 +) + +type uuid [16]byte + +// NewUUID returns a new uuid using RFC 4122 algorithm. +func newUUID() (u uuid) { + u = uuid{} + // Set all bits to randomly (or pseudo-randomly) chosen values. + rand.Read(u[:]) + u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122) + + var version byte = 4 + u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4) + return +} + +// String returns an unparsed version of the generated UUID sequence. +func (u uuid) String() string { + return fmt.Sprintf("%x-%x-%x-%x-%x", u[0:4], u[4:6], u[6:8], u[8:10], u[10:]) +} + +func (u uuid) bytes() []byte { + return u[:] +} diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go new file mode 100644 index 000000000..364eb07ab --- /dev/null +++ b/weed/remote_storage/azure/azure_storage_client.go @@ -0,0 +1,207 @@ +package azure + +import ( + "context" + "fmt" + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "io/ioutil" + "net/url" + "os" + "reflect" +) + +func init() { + remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker) +} + +type azureRemoteStorageMaker struct{} + +func (s azureRemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { + + client := &azureRemoteStorageClient{ + conf: conf, + } + + accountName, accountKey := conf.AzureAccountName, conf.AzureAccountKey + if len(accountName) == 0 || len(accountKey) == 0 { + accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") + if len(accountName) == 0 || len(accountKey) == 0 { + return nil, fmt.Errorf("either AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set") + } + } + + // Use your Storage account's name and key to create a credential object. + credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) + if err != nil { + return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err) + } + + // Create a request pipeline that is used to process HTTP(S) requests and responses. + p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + + // Create an ServiceURL object that wraps the service URL and a request pipeline. + u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName)) + client.serviceURL = azblob.NewServiceURL(*u, p) + + return client, nil +} + +type azureRemoteStorageClient struct { + conf *filer_pb.RemoteConf + serviceURL azblob.ServiceURL +} + +var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{}) + +func (az *azureRemoteStorageClient) Traverse(loc *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { + + pathKey := loc.Path[1:] + containerURL := az.serviceURL.NewContainerURL(loc.Bucket) + + // List the container that we have created above + for marker := (azblob.Marker{}); marker.NotDone(); { + // Get a result segment starting with the blob indicated by the current Marker. + listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{ + Prefix: pathKey, + }) + if err != nil { + return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err) + } + + // ListBlobs returns the start of the next segment; you MUST use this to get + // the next segment (after processing the current result segment). + marker = listBlob.NextMarker + + // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute) + for _, blobInfo := range listBlob.Segment.BlobItems { + key := blobInfo.Name + key = "/" + key + dir, name := util.FullPath(key).DirAndName() + err = visitFn(dir, name, false, &filer_pb.RemoteEntry{ + RemoteMtime: blobInfo.Properties.LastModified.Unix(), + RemoteSize: *blobInfo.Properties.ContentLength, + RemoteETag: string(blobInfo.Properties.Etag), + StorageName: az.conf.Name, + }) + if err != nil { + return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err) + } + } + } + + return +} +func (az *azureRemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { + + key := loc.Path[1:] + containerURL := az.serviceURL.NewContainerURL(loc.Bucket) + blobURL := containerURL.NewBlockBlobURL(key) + + downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if readErr != nil { + return nil, readErr + } + // NOTE: automatically retries are performed if the connection fails + bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20}) + defer bodyStream.Close() + + data, err = ioutil.ReadAll(bodyStream) + + if err != nil { + return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err) + } + + return +} + +func (az *azureRemoteStorageClient) WriteDirectory(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { + return nil +} + +func (az *azureRemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { + + key := loc.Path[1:] + containerURL := az.serviceURL.NewContainerURL(loc.Bucket) + blobURL := containerURL.NewBlockBlobURL(key) + + readerAt, ok := reader.(io.ReaderAt) + if !ok { + return nil, fmt.Errorf("unexpected reader: readerAt expected") + } + fileSize := int64(filer.FileSize(entry)) + + _, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{ + BlockSize: 4 * 1024 * 1024, + Parallelism: 16}) + if err != nil { + return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err) + } + + metadata := toMetadata(entry.Extended) + if len(metadata) > 0 { + _, err = blobURL.SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, fmt.Errorf("azure set metadata on %s%s: %v", loc.Bucket, loc.Path, err) + } + } + + // read back the remote entry + return az.readFileRemoteEntry(loc) + +} + +func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *filer_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { + key := loc.Path[1:] + containerURL := az.serviceURL.NewContainerURL(loc.Bucket) + blobURL := containerURL.NewBlockBlobURL(key) + + attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + + if err != nil { + return nil, err + } + + return &filer_pb.RemoteEntry{ + RemoteMtime: attr.LastModified().Unix(), + RemoteSize: attr.ContentLength(), + RemoteETag: string(attr.ETag()), + StorageName: az.conf.Name, + }, nil + +} + +func toMetadata(attributes map[string][]byte) map[string]string { + metadata := make(map[string]string) + for k, v := range attributes { + metadata[k] = string(v) + } + return metadata +} + +func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) { + if reflect.DeepEqual(oldEntry.Extended, newEntry.Extended) { + return nil + } + metadata := toMetadata(newEntry.Extended) + + key := loc.Path[1:] + containerURL := az.serviceURL.NewContainerURL(loc.Bucket) + + _, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + + return +} +func (az *azureRemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) { + key := loc.Path[1:] + containerURL := az.serviceURL.NewContainerURL(loc.Bucket) + if _, err = containerURL.NewBlobURL(key).Delete(context.Background(), + azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil { + return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err) + } + return +} |
