diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-01 01:29:22 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-01 01:29:22 -0700 |
| commit | 3bd48c4f2925a725b347dff4e4d66928591e1598 (patch) | |
| tree | a77204e2691b05a08c25fda0de7eafe512b4c705 /weed/remote_storage/track_sync_offset.go | |
| parent | 3faaa6e3601d233805c4358b32a9b853add579ff (diff) | |
| download | seaweedfs-3bd48c4f2925a725b347dff4e4d66928591e1598.tar.xz seaweedfs-3bd48c4f2925a725b347dff4e4d66928591e1598.zip | |
filer.remote.sync: exit when directory is unmounted
this will not propagate the deletions back to the cloud
Diffstat (limited to 'weed/remote_storage/track_sync_offset.go')
| -rw-r--r-- | weed/remote_storage/track_sync_offset.go | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/weed/remote_storage/track_sync_offset.go b/weed/remote_storage/track_sync_offset.go new file mode 100644 index 000000000..2dfb6d784 --- /dev/null +++ b/weed/remote_storage/track_sync_offset.go @@ -0,0 +1,73 @@ +package remote_storage + +import ( + "context" + "errors" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +const ( + SyncKeyPrefix = "remote.sync." +) + +func GetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string) (lastOffsetTsNs int64, readErr error) { + + dirHash := uint32(util.HashStringToLong(dir)) + + readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + syncKey := []byte(SyncKeyPrefix + "____") + util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) + + resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) + if err != nil { + return err + } + + if len(resp.Error) != 0 { + return errors.New(resp.Error) + } + if len(resp.Value) < 8 { + return nil + } + + lastOffsetTsNs = int64(util.BytesToUint64(resp.Value)) + + return nil + }) + + return + +} + +func SetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string, offsetTsNs int64) error { + + dirHash := uint32(util.HashStringToLong(dir)) + + return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + syncKey := []byte(SyncKeyPrefix + "____") + util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) + + valueBuf := make([]byte, 8) + util.Uint64toBytes(valueBuf, uint64(offsetTsNs)) + + resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{ + Key: syncKey, + Value: valueBuf, + }) + if err != nil { + return err + } + + if len(resp.Error) != 0 { + return errors.New(resp.Error) + } + + return nil + + }) + +} |
