aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-10-04 01:14:44 -0700
committerChris Lu <chris.lu@gmail.com>2018-10-04 01:14:44 -0700
commit56c5c7b1b6ecd5b43ee0d47a72a74049a3d54264 (patch)
tree79a21ac7bc6110a667152fe0019254f6bd57958e
parente8ef501f02217d29e35f4a52c86be3d93939ae6f (diff)
downloadseaweedfs-56c5c7b1b6ecd5b43ee0d47a72a74049a3d54264.tar.xz
seaweedfs-56c5c7b1b6ecd5b43ee0d47a72a74049a3d54264.zip
add google cloud storage
-rw-r--r--weed/command/scaffold.go10
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go130
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go5
3 files changed, 144 insertions, 1 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index e0bd48c8c..fbe6934e1 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -178,6 +178,7 @@ collection = ""
ttlSec = 0
[sink.s3]
+# experimental
# See https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
# default loads credentials from the shared credentials file (~/.aws/credentials).
enabled = false
@@ -187,5 +188,14 @@ region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "" # destination directory (do not prefix or suffix with "/")
+[sink.google_cloud_storage]
+# experimental
+# see https://cloud.google.com/docs/authentication/getting-started
+enabled = false
+google_application_credentials = "/path/to/x.json" # path to json credential file
+projectId = "your_project_id"
+bucket = "your_bucket_name" # an existing bucket
+directory = "/" # destination directory
+
`
)
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
new file mode 100644
index 000000000..f96cae7df
--- /dev/null
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -0,0 +1,130 @@
+package gcssink
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "cloud.google.com/go/storage"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "os"
+ "google.golang.org/api/option"
+)
+
+type GcsSink struct {
+ client *storage.Client
+ projectId string
+ bucket string
+ dir string
+ filerSource *source.FilerSource
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &GcsSink{})
+}
+
+func (g *GcsSink) GetName() string {
+ return "google_cloud_storage"
+}
+
+func (g *GcsSink) GetSinkToDirectory() string {
+ return g.dir
+}
+
+func (g *GcsSink) Initialize(configuration util.Configuration) error {
+ return g.initialize(
+ configuration.GetString("google_application_credentials"),
+ configuration.GetString("projectId"),
+ configuration.GetString("bucket"),
+ configuration.GetString("directory"),
+ )
+}
+
+func (g *GcsSink) SetSourceFiler(s *source.FilerSource) {
+ g.filerSource = s
+}
+
+func (g *GcsSink) initialize(google_application_credentials, projectId, bucketName, dir string) (error) {
+ g.projectId = projectId
+ g.bucket = bucketName
+ g.dir = dir
+
+ ctx := context.Background()
+ // Creates a client.
+ if google_application_credentials == "" {
+ var found bool
+ google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
+ if !found {
+ log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
+ }
+ }
+ client, err := storage.NewClient(ctx, option.WithCredentialsFile(google_application_credentials))
+ if err != nil {
+ log.Fatalf("Failed to create client: %v", err)
+ }
+
+ g.client = client
+
+ return nil
+}
+
+func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
+
+ if isDirectory {
+ key = key + "/"
+ }
+
+ if err := g.client.Bucket(g.bucket).Object(key).Delete(context.Background()); err != nil {
+ return fmt.Errorf("gcs delete %s %s", g.bucket, key)
+ }
+
+ return nil
+
+}
+
+func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
+
+ if entry.IsDirectory {
+ return nil
+ }
+
+ totalSize := filer2.TotalSize(entry.Chunks)
+ chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
+
+ ctx := context.Background()
+
+ wc := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx)
+
+ for _, chunk := range chunkViews {
+
+ fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
+ if err != nil {
+ return err
+ }
+
+ _, err = util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
+ wc.Write(data)
+ })
+
+ if err != nil {
+ return err
+ }
+
+ }
+
+ if err := wc.Close(); err != nil {
+ return err
+ }
+
+ return nil
+
+}
+
+func (g *GcsSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
+ // TODO improve efficiency
+ return false, nil
+}
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index c8300a108..ee2e46422 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -17,7 +17,6 @@ import (
)
type S3Sink struct {
- err error
conn s3iface.S3API
region string
bucket string
@@ -84,6 +83,10 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b
func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
+ if entry.IsDirectory {
+ return nil
+ }
+
uploadId, err := s3sink.createMultipartUpload(key, entry)
if err != nil {
return err