aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-08-08 17:55:03 -0700
committerChris Lu <chris.lu@gmail.com>2021-08-08 17:55:03 -0700
commit7412ccdf88d5096e0d0d9475a77322188f9c8a5d (patch)
tree0042cebede2c10781443253254474f26dc277df6
parentdcf614a8c39d85bd10884d7feaa6aac8217c2946 (diff)
downloadseaweedfs-7412ccdf88d5096e0d0d9475a77322188f9c8a5d.tar.xz
seaweedfs-7412ccdf88d5096e0d0d9475a77322188f9c8a5d.zip
write back remote entry to local entry after uploading to remote
-rw-r--r--weed/command/filer_remote_sync.go24
-rw-r--r--weed/remote_storage/remote_storage.go2
-rw-r--r--weed/remote_storage/s3/s3_storage_client.go26
3 files changed, 46 insertions, 6 deletions
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index be8d3faff..4afb7c091 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -157,7 +158,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
}
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- return client.WriteFile(dest, message.NewEntry, reader)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
if message.OldEntry != nil && message.NewEntry == nil {
fmt.Printf("delete: %+v\n", resp)
@@ -182,7 +187,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return err
}
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- return client.WriteFile(dest, message.NewEntry, reader)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
return nil
@@ -238,3 +247,14 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool {
}
return false
}
+
+func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
+ entry.RemoteEntry = remoteEntry
+ return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ return err
+ })
+} \ No newline at end of file
diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go
index 608d158ad..c94260ac0 100644
--- a/weed/remote_storage/remote_storage.go
+++ b/weed/remote_storage/remote_storage.go
@@ -32,7 +32,7 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
type RemoteStorageClient interface {
Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error
ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
- WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error)
+ WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error)
UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error)
}
diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go
index 316751227..5be64406e 100644
--- a/weed/remote_storage/s3/s3_storage_client.go
+++ b/weed/remote_storage/s3/s3_storage_client.go
@@ -116,7 +116,7 @@ func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, of
return writerAt.Bytes(), nil
}
-func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) {
+func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
fileSize := int64(filer.FileSize(entry))
@@ -153,10 +153,12 @@ func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, e
//in case it fails to upload
if err != nil {
- return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
+ return nil, fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
}
- return nil
+ // read back the remote entry
+ return s.readFileRemoteEntry(loc)
+
}
func toTagging(attributes map[string][]byte) *s3.Tagging {
@@ -170,6 +172,24 @@ func toTagging(attributes map[string][]byte) *s3.Tagging {
return tagging
}
+func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *filer_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
+ resp, err := s.conn.HeadObject(&s3.HeadObjectInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return &filer_pb.RemoteEntry{
+ LastModifiedAt: resp.LastModified.Unix(),
+ Size: *resp.ContentLength,
+ ETag: *resp.ETag,
+ StorageName: s.conf.Name,
+ }, nil
+
+}
+
func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
tagging := toTagging(entry.Extended)
if len(tagging.TagSet) > 0 {