diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-10-23 23:59:40 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-10-23 23:59:40 -0700 |
| commit | c58b73ad9bf596337b4ed254ae341e935c8600b4 (patch) | |
| tree | 53b48e8ea7ead791f641594cf6ad0df70c92183d /weed/replication | |
| parent | 7f1de8677996aabb6b0ec6a25b92bdef335bd9da (diff) | |
| download | seaweedfs-c58b73ad9bf596337b4ed254ae341e935c8600b4.tar.xz seaweedfs-c58b73ad9bf596337b4ed254ae341e935c8600b4.zip | |
adding BackBlaze, Azure
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go new file mode 100644 index 000000000..ce0e9eb3c --- /dev/null +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -0,0 +1,128 @@ +package B2Sink + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/kurin/blazer/b2" +) + +type B2Sink struct { + client *b2.Client + bucket string + dir string + filerSource *source.FilerSource +} + +func init() { + sink.Sinks = append(sink.Sinks, &B2Sink{}) +} + +func (g *B2Sink) GetName() string { + return "backblaze" +} + +func (g *B2Sink) GetSinkToDirectory() string { + return g.dir +} + +func (g *B2Sink) Initialize(configuration util.Configuration) error { + return g.initialize( + configuration.GetString("account_id"), + configuration.GetString("account_key"), + configuration.GetString("bucket"), + configuration.GetString("directory"), + ) +} + +func (g *B2Sink) SetSourceFiler(s *source.FilerSource) { + g.filerSource = s +} + +func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { + ctx := context.Background() + client, err := b2.NewClient(ctx, accountId, accountKey) + if err != nil { + return nil + } + + g.client = client + g.bucket = bucket + g.dir = dir + + return nil +} + +func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { + + if isDirectory { + key = key + "/" + } + + ctx := context.Background() + + bucket, err := g.client.Bucket(ctx, g.bucket) + if err != nil { + return err + } + + targetObject := bucket.Object(key) + + return targetObject.Delete(ctx) + +} + +func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { + + if entry.IsDirectory { + return nil + } + + totalSize := filer2.TotalSize(entry.Chunks) + chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) + + ctx := context.Background() + + bucket, err := g.client.Bucket(ctx, g.bucket) + if err != nil { + return err + } + + targetObject := bucket.Object(key) + writer := targetObject.NewWriter(ctx) + + for _, chunk := range chunkViews { + + fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) + if err != nil { + return err + } + + var writeErr error + _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { + _, err := writer.Write(data) + if err != nil { + writeErr = err + } + }) + + if readErr != nil { + return readErr + } + if writeErr != nil { + return writeErr + } + + } + + return writer.Close() + +} + +func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { + // TODO improve efficiency + return false, nil +} |
