aboutsummaryrefslogtreecommitdiff
path: root/weed/remote_storage/track_sync_offset.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-09-01 01:29:22 -0700
committerChris Lu <chris.lu@gmail.com>2021-09-01 01:29:22 -0700
commit3bd48c4f2925a725b347dff4e4d66928591e1598 (patch)
treea77204e2691b05a08c25fda0de7eafe512b4c705 /weed/remote_storage/track_sync_offset.go
parent3faaa6e3601d233805c4358b32a9b853add579ff (diff)
downloadseaweedfs-3bd48c4f2925a725b347dff4e4d66928591e1598.tar.xz
seaweedfs-3bd48c4f2925a725b347dff4e4d66928591e1598.zip
filer.remote.sync: exit when directory is unmounted
this will not propagate the deletions back to the cloud
Diffstat (limited to 'weed/remote_storage/track_sync_offset.go')
-rw-r--r--weed/remote_storage/track_sync_offset.go73
1 files changed, 73 insertions, 0 deletions
diff --git a/weed/remote_storage/track_sync_offset.go b/weed/remote_storage/track_sync_offset.go
new file mode 100644
index 000000000..2dfb6d784
--- /dev/null
+++ b/weed/remote_storage/track_sync_offset.go
@@ -0,0 +1,73 @@
+package remote_storage
+
+import (
+ "context"
+ "errors"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+)
+
+const (
+ SyncKeyPrefix = "remote.sync."
+)
+
+func GetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string) (lastOffsetTsNs int64, readErr error) {
+
+ dirHash := uint32(util.HashStringToLong(dir))
+
+ readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ syncKey := []byte(SyncKeyPrefix + "____")
+ util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
+
+ resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
+ if err != nil {
+ return err
+ }
+
+ if len(resp.Error) != 0 {
+ return errors.New(resp.Error)
+ }
+ if len(resp.Value) < 8 {
+ return nil
+ }
+
+ lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
+
+ return nil
+ })
+
+ return
+
+}
+
+func SetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string, offsetTsNs int64) error {
+
+ dirHash := uint32(util.HashStringToLong(dir))
+
+ return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ syncKey := []byte(SyncKeyPrefix + "____")
+ util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
+
+ valueBuf := make([]byte, 8)
+ util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
+
+ resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
+ Key: syncKey,
+ Value: valueBuf,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(resp.Error) != 0 {
+ return errors.New(resp.Error)
+ }
+
+ return nil
+
+ })
+
+}