aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/Makefile4
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/filer.go1
-rw-r--r--weed/command/filer_remote_sync.go219
-rw-r--r--weed/command/filer_replication.go6
-rw-r--r--weed/command/imports.go31
-rw-r--r--weed/filer/filer_remote_storage.go1
-rw-r--r--weed/filer/stream.go32
-rw-r--r--weed/remote_storage/remote_storage.go6
-rw-r--r--weed/remote_storage/s3/s3_storage_client.go94
-rw-r--r--weed/shell/command_remote_configure.go11
-rw-r--r--weed/shell/command_remote_mount.go5
12 files changed, 381 insertions, 30 deletions
diff --git a/weed/Makefile b/weed/Makefile
index c82735a0e..ad1f0b6f9 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -37,3 +37,7 @@ debug_s3:
debug_filer_copy:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h
+
+debug_filer_remote_sync:
+ go build -gcflags="all=-N -l"
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.remote.sync -filer="localhost:8888" -dir=/buckets/b2 -timeAgo=10000h
diff --git a/weed/command/command.go b/weed/command/command.go
index 9ae93fe61..02de2bd35 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -21,6 +21,7 @@ var Commands = []*Command{
cmdFilerCopy,
cmdFilerMetaBackup,
cmdFilerMetaTail,
+ cmdFilerRemoteSynchronize,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 4fd2f9c72..ddee0852c 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -3,7 +3,6 @@ package command
import (
"fmt"
"net/http"
- _ "net/http/pprof"
"os"
"strconv"
"strings"
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
new file mode 100644
index 000000000..8d176ce2a
--- /dev/null
+++ b/weed/command/filer_remote_sync.go
@@ -0,0 +1,219 @@
+package command
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+ "strings"
+ "time"
+)
+
+type RemoteSyncOptions struct {
+ filerAddress *string
+ grpcDialOption grpc.DialOption
+ readChunkFromFiler *bool
+ debug *bool
+ timeAgo *time.Duration
+ dir *string
+}
+
+const (
+ RemoteSyncKeyPrefix = "remote.sync."
+)
+
+var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
+
+func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return fn(client)
+ })
+}
+func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
+
+var (
+ remoteSyncOptions RemoteSyncOptions
+)
+
+func init() {
+ cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
+ remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
+ remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
+ remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
+ remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
+ remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+}
+
+var cmdFilerRemoteSynchronize = &Command{
+ UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
+ Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage",
+ Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage
+
+ filer.remote.sync listens on filer update events.
+ If any mounted remote file is updated, it will fetch the updated content,
+ and write to the remote storage.
+`,
+}
+
+func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ remoteSyncOptions.grpcDialOption = grpcDialOption
+
+ // read filer remote storage mount mappings
+ mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
+ if readErr != nil {
+ fmt.Printf("read mount mapping: %v", readErr)
+ return false
+ }
+
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(
+ *remoteSyncOptions.filerAddress,
+ pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
+ "/", // does not matter
+ *remoteSyncOptions.readChunkFromFiler,
+ )
+
+ var found bool
+ for dir, remoteStorageMountLocation := range mappings.Mappings {
+ if *remoteSyncOptions.dir == dir {
+ found = true
+ storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
+ if readErr != nil {
+ fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
+ continue
+ }
+ fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
+ if err := util.Retry("filer.remote.sync "+dir, func() error {
+ return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
+ }); err != nil {
+ fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
+ }
+ break
+ }
+ }
+ if !found {
+ fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
+ return false
+ }
+
+ return true
+}
+
+func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
+
+ dirHash := util.HashStringToLong(mountedDir)
+
+ // 1. specified by timeAgo
+ // 2. last offset timestamp for this directory
+ // 3. directory creation time
+ var lastOffsetTs time.Time
+ if *option.timeAgo == 0 {
+ mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ if err != nil {
+ return fmt.Errorf("lookup %s: %v", mountedDir, err)
+ }
+
+ lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
+ if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
+ lastOffsetTs = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resume from %v", lastOffsetTs)
+ } else {
+ lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ }
+ } else {
+ lastOffsetTs = time.Now().Add(-*option.timeAgo)
+ }
+
+ client, err := remote_storage.GetRemoteStorage(remoteStorage)
+ if err != nil {
+ return err
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if len(message.NewEntry.Chunks) == 0 {
+ return nil
+ }
+ fmt.Printf("create: %+v\n", resp)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ return client.WriteFile(dest, message.NewEntry, reader)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ fmt.Printf("delete: %+v\n", resp)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ return client.DeleteFile(dest)
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
+ fmt.Printf("update meta: %+v\n", resp)
+ return client.UpdateFileMetadata(dest, message.NewEntry)
+ }
+ }
+ fmt.Printf("update: %+v\n", resp)
+ if err := client.DeleteFile(oldDest); err != nil {
+ return err
+ }
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ return client.WriteFile(dest, message.NewEntry, reader)
+ }
+
+ return nil
+ }
+
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
+ return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
+ })
+
+ return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
+ "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+}
+
+func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
+ var dest string
+ source := string(sourcePath[len(mountDir):])
+ if strings.HasSuffix(remoteMountLocation.Path, "/") {
+ dest = remoteMountLocation.Path + source[1:]
+ } else {
+ dest = remoteMountLocation.Path + source
+ }
+ return &filer_pb.RemoteStorageLocation{
+ Name: remoteMountLocation.Name,
+ Bucket: remoteMountLocation.Bucket,
+ Path: dest,
+ }
+}
+
+func isSameChunks(a, b []*filer_pb.FileChunk) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i := 0; i < len(a); i++ {
+ x, y := a[i], b[i]
+ if !proto.Equal(x, y) {
+ return false
+ }
+ }
+ return true
+}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 885c95540..bf0a3e140 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -7,12 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
)
diff --git a/weed/command/imports.go b/weed/command/imports.go
new file mode 100644
index 000000000..d7ade1379
--- /dev/null
+++ b/weed/command/imports.go
@@ -0,0 +1,31 @@
+package command
+
+import (
+ _ "net/http/pprof"
+
+ _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
+
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+
+) \ No newline at end of file
diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go
index b1ee96a42..573dcf3e7 100644
--- a/weed/filer/filer_remote_storage.go
+++ b/weed/filer/filer_remote_storage.go
@@ -5,7 +5,6 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
- _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 3859f9a67..503e6b23f 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -91,6 +91,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
type ChunkStreamReader struct {
chunkViews []*ChunkView
totalSize int64
+ logicOffset int64
buffer []byte
bufferOffset int64
bufferPos int
@@ -137,8 +138,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F
}
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
- _, err = c.Seek(off, io.SeekStart)
- if err != nil {
+ if err = c.prepareBufferFor(c.logicOffset); err != nil {
return
}
return c.Read(p)
@@ -151,12 +151,15 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
return n, io.EOF
}
chunkView := c.chunkViews[c.nextChunkViewIndex]
- c.fetchChunkToBuffer(chunkView)
+ if err = c.fetchChunkToBuffer(chunkView); err != nil {
+ return
+ }
c.nextChunkViewIndex++
}
t := copy(p[n:], c.buffer[c.bufferPos:])
c.bufferPos += t
n += t
+ c.logicOffset += int64(t)
}
return
}
@@ -171,19 +174,26 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
- offset += c.bufferOffset + int64(c.bufferPos)
+ offset += c.logicOffset
case io.SeekEnd:
offset = c.totalSize + offset
}
if offset > c.totalSize {
err = io.ErrUnexpectedEOF
+ } else {
+ c.logicOffset = offset
}
+ return offset, err
+
+}
+
+func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
// stay in the same chunk
if !c.isBufferEmpty() {
if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
c.bufferPos = int(offset - c.bufferOffset)
- return offset, nil
+ return nil
}
}
@@ -192,23 +202,21 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
return c.chunkViews[i].LogicOffset <= offset
})
if currentChunkIndex == len(c.chunkViews) {
- return 0, io.EOF
+ return io.EOF
}
// positioning within the new chunk
chunk := c.chunkViews[currentChunkIndex]
if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
- c.fetchChunkToBuffer(chunk)
+ if err = c.fetchChunkToBuffer(chunk); err != nil {
+ return
+ }
c.nextChunkViewIndex = currentChunkIndex + 1
}
c.bufferPos = int(offset - c.bufferOffset)
- } else {
- return 0, io.ErrUnexpectedEOF
}
-
- return offset, err
-
+ return
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go
index 06c089d7a..608d158ad 100644
--- a/weed/remote_storage/remote_storage.go
+++ b/weed/remote_storage/remote_storage.go
@@ -3,6 +3,7 @@ package remote_storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
"strings"
"sync"
)
@@ -30,7 +31,10 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
type RemoteStorageClient interface {
Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error
- ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error)
+ ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
+ WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error)
+ UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
+ DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error)
}
type RemoteStorageClientMaker interface {
diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go
index 2263054f3..316751227 100644
--- a/weed/remote_storage/s3/s3_storage_client.go
+++ b/weed/remote_storage/s3/s3_storage_client.go
@@ -8,9 +8,11 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
+ "io"
)
func init() {
@@ -45,7 +47,9 @@ type s3RemoteStorageClient struct {
conn s3iface.S3API
}
-func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
+var _ = remote_storage.RemoteStorageClient(&s3RemoteStorageClient{})
+
+func (s *s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := remote.Path[1:]
@@ -91,19 +95,19 @@ func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation,
}
return
}
-func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) {
+func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
u.PartSize = int64(4 * 1024 * 1024)
u.Concurrency = 1
})
-
+
dataSlice := make([]byte, int(size))
writerAt := aws.NewWriteAtBuffer(dataSlice)
_, err = downloader.Download(writerAt, &s3.GetObjectInput{
- Bucket: aws.String(loc.Bucket),
- Key: aws.String(loc.Path[1:]),
- Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
})
if err != nil {
return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
@@ -111,3 +115,81 @@ func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, off
return writerAt.Bytes(), nil
}
+
+func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) {
+
+ fileSize := int64(filer.FileSize(entry))
+
+ partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB
+ for partSize*1000 < fileSize {
+ partSize *= 4
+ }
+
+ // Create an uploader with the session and custom options
+ uploader := s3manager.NewUploaderWithClient(s.conn, func(u *s3manager.Uploader) {
+ u.PartSize = partSize
+ u.Concurrency = 5
+ })
+
+ // process tagging
+ tags := ""
+ for k, v := range entry.Extended {
+ if len(tags) > 0 {
+ tags = tags + "&"
+ }
+ tags = tags + k + "=" + string(v)
+ }
+
+ // Upload the file to S3.
+ _, err = uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ Body: reader,
+ ACL: aws.String("private"),
+ ServerSideEncryption: aws.String("AES256"),
+ StorageClass: aws.String("STANDARD_IA"),
+ Tagging: aws.String(tags),
+ })
+
+ //in case it fails to upload
+ if err != nil {
+ return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
+ }
+
+ return nil
+}
+
+func toTagging(attributes map[string][]byte) *s3.Tagging {
+ tagging := &s3.Tagging{}
+ for k, v := range attributes {
+ tagging.TagSet = append(tagging.TagSet, &s3.Tag{
+ Key: aws.String(k),
+ Value: aws.String(string(v)),
+ })
+ }
+ return tagging
+}
+
+func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
+ tagging := toTagging(entry.Extended)
+ if len(tagging.TagSet) > 0 {
+ _, err = s.conn.PutObjectTagging(&s3.PutObjectTaggingInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ Tagging: toTagging(entry.Extended),
+ })
+ } else {
+ _, err = s.conn.DeleteObjectTagging(&s3.DeleteObjectTaggingInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ })
+ }
+ return
+}
+func (s *s3RemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) {
+ _, err = s.conn.DeleteObject(&s3.DeleteObjectInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ })
+ return
+}
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
index 20ded5f5b..7a9ad1f65 100644
--- a/weed/shell/command_remote_configure.go
+++ b/weed/shell/command_remote_configure.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"io"
"regexp"
@@ -96,9 +97,15 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE
conf.S3SecretKey = ""
- fmt.Fprintf(writer, "%+v\n", conf)
+ m := jsonpb.Marshaler{
+ EmitDefaults: false,
+ Indent: " ",
+ }
- return nil
+ err := m.Marshal(writer, conf)
+ fmt.Fprintln(writer)
+
+ return err
})
}
diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go
index 73a5119d5..37b235a55 100644
--- a/weed/shell/command_remote_mount.go
+++ b/weed/shell/command_remote_mount.go
@@ -88,7 +88,10 @@ func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *Command
Indent: " ",
}
- return m.Marshal(writer, mappings)
+ err = m.Marshal(writer, mappings)
+ fmt.Fprintln(writer)
+
+ return
}