aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-10-09 01:35:48 -0700
committerChris Lu <chris.lu@gmail.com>2018-10-09 01:35:48 -0700
commit9f4c2f87d3415e656b2ef5109522a218317d858d (patch)
tree9d5ae2c5ea388097ed3e75c1a93771187eba62e2
parent453d0be4d4f3f8d6c1333ab99f0281ed4ba8c5a8 (diff)
downloadseaweedfs-9f4c2f87d3415e656b2ef5109522a218317d858d.tar.xz
seaweedfs-9f4c2f87d3415e656b2ef5109522a218317d858d.zip
adding filer replication sink to Azure
-rw-r--r--weed/command/scaffold.go8
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go133
2 files changed, 141 insertions, 0 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index cdcb570bf..2e7aa0cb6 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -194,5 +194,13 @@ google_application_credentials = "/path/to/x.json" # path to json credential fil
bucket = "your_bucket_seaweedfs" # an existing bucket
directory = "/" # destination directory
+[sink.azure]
+# experimental
+enabled = false
+account_name = ""
+account_key = ""
+container = "mycontainer" # an existing container
+directory = "" # destination directory (do not prefix or suffix with "/")
+
`
)
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
new file mode 100644
index 000000000..556dc4a6a
--- /dev/null
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -0,0 +1,133 @@
+package azuresink
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "bytes"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/Azure/azure-storage-blob-go/2016-05-31/azblob"
+)
+
+type AzureSink struct {
+ containerURL azblob.ContainerURL
+ container string
+ dir string
+ filerSource *source.FilerSource
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &AzureSink{})
+}
+
+func (g *AzureSink) GetName() string {
+ return "azure"
+}
+
+func (g *AzureSink) GetSinkToDirectory() string {
+ return g.dir
+}
+
+func (g *AzureSink) Initialize(configuration util.Configuration) error {
+ return g.initialize(
+ configuration.GetString("account_name"),
+ configuration.GetString("account_key"),
+ configuration.GetString("container"),
+ configuration.GetString("directory"),
+ )
+}
+
+func (g *AzureSink) SetSourceFiler(s *source.FilerSource) {
+ g.filerSource = s
+}
+
+func (g *AzureSink) initialize(accountName, accountKey, container, dir string) (error) {
+ g.container = container
+ g.dir = dir
+
+ // Use your Storage account's name and key to create a credential object.
+ credential := azblob.NewSharedKeyCredential(accountName, accountKey)
+
+ // Create a request pipeline that is used to process HTTP(S) requests and responses.
+ p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
+
+ // Create an ServiceURL object that wraps the service URL and a request pipeline.
+ u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
+ serviceURL := azblob.NewServiceURL(*u, p)
+
+ g.containerURL = serviceURL.NewContainerURL(g.container)
+
+ return nil
+}
+
+func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
+
+ if isDirectory {
+ key = key + "/"
+ }
+
+ ctx := context.Background()
+
+ if _, err := g.containerURL.NewBlobURL(key).Delete(ctx,
+ azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
+ return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err)
+ }
+
+ return nil
+
+}
+
+func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
+
+ if entry.IsDirectory {
+ return nil
+ }
+
+ totalSize := filer2.TotalSize(entry.Chunks)
+ chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
+
+ ctx := context.Background()
+
+ // Create a URL that references a to-be-created blob in your
+ // Azure Storage account's container.
+ appendBlobURL := g.containerURL.NewAppendBlobURL(key)
+
+ _, err := appendBlobURL.Create(ctx, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
+ if err != nil {
+ return err
+ }
+
+ for _, chunk := range chunkViews {
+
+ fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
+ if err != nil {
+ return err
+ }
+
+ var writeErr error
+ _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
+ _, writeErr = appendBlobURL.AppendBlock(ctx, bytes.NewReader(data), azblob.BlobAccessConditions{})
+ })
+
+ if readErr != nil {
+ return readErr
+ }
+ if writeErr != nil {
+ return writeErr
+ }
+
+ }
+
+ return nil
+
+}
+
+func (g *AzureSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+ // TODO improve efficiency
+ return false, nil
+}