aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2021-09-13 10:34:33 +0800
committerGitHub <noreply@github.com>2021-09-13 10:34:33 +0800
commit1de733fda507e1da94b2e4741c74ba7e5e2c5f76 (patch)
treeaed7ac29e27e0f8def942154603375396fae9489 /weed/filer
parent27c05f8c0b5c7bda43babeb61d79684d11851111 (diff)
parent7591336a2269c1ad92266280634bcaea34f7a5d1 (diff)
downloadseaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.tar.xz
seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.zip
Merge pull request #81 from chrislusf/master
sync
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/etcd/etcd_store.go2
-rw-r--r--weed/filer/filechunk_manifest.go5
-rw-r--r--weed/filer/filer_notify_append.go11
-rw-r--r--weed/filer/leveldb/leveldb_store.go7
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go7
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go14
-rw-r--r--weed/filer/read_remote.go23
-rw-r--r--weed/filer/remote_mapping.go121
-rw-r--r--weed/filer/remote_storage.go (renamed from weed/filer/filer_remote_storage.go)110
-rw-r--r--weed/filer/remote_storage_test.go (renamed from weed/filer/filer_remote_storage_test.go)6
-rw-r--r--weed/filer/sqlite/sqlite_store.go1
-rw-r--r--weed/filer/sqlite/sqlite_store_unsupported.go1
-rw-r--r--weed/filer/stream.go143
-rw-r--r--weed/filer/tikv/tikv.go5
-rw-r--r--weed/filer/tikv/tikv_store.go389
-rw-r--r--weed/filer/tikv/tikv_store_kv.go50
16 files changed, 759 insertions, 136 deletions
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index 71ed738f9..2a5dfc926 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -7,7 +7,7 @@ import (
"strings"
"time"
- "go.etcd.io/etcd/clientv3"
+ "go.etcd.io/etcd/client/v3"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 00dbf1fd6..32008271b 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -6,6 +6,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
+ "net/url"
+ "strings"
"time"
"github.com/golang/protobuf/proto"
@@ -108,6 +110,9 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
receivedData = receivedData[:0]
+ if strings.Contains(urlString, "%") {
+ urlString = url.PathEscape(urlString)
+ }
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
receivedData = append(receivedData, data...)
})
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index d441bbbc9..73762cde7 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -66,7 +66,16 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
// upload data
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: targetUrl,
+ Filename: "",
+ Cipher: f.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: assignResult.Auth,
+ }
+ uploadResult, err := operation.UploadData(data, uploadOption)
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index ce454f36a..9a7670d42 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/syndtr/goleveldb/leveldb"
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
"os"
@@ -45,9 +46,9 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
}
opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10,
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ Filter: filter.NewBloomFilter(8), // false positive rate 0.02
}
if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 4c4409c4d..966686ed9 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -46,10 +46,9 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
}
opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- Filter: filter.NewBloomFilter(8), // false positive rate 0.02
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ Filter: filter.NewBloomFilter(8), // false positive rate 0.02
}
for d := 0; d < dbCount; d++ {
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index bc57a6605..86e2b584b 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -66,17 +66,15 @@ func (store *LevelDB3Store) initialize(dir string) (err error) {
func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
bloom := filter.NewBloomFilter(8) // false positive rate 0.02
opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- Filter: bloom,
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ Filter: bloom,
}
if name != DEFAULT {
opts = &opt.Options{
- BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- Filter: bloom,
+ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
+ Filter: bloom,
}
}
diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go
index eee536ff6..58639024b 100644
--- a/weed/filer/read_remote.go
+++ b/weed/filer/read_remote.go
@@ -2,8 +2,8 @@ package filer
import (
"context"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -11,21 +11,8 @@ func (entry *Entry) IsInRemoteOnly() bool {
return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.RemoteSize > 0
}
-func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data []byte, err error) {
- client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName)
- if !found {
- return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName)
- }
-
- mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath)
-
- sourceLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoation, entry.FullPath)
-
- return client.ReadFile(sourceLoc, offset, size)
-}
-
-func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, fp util.FullPath) *filer_pb.RemoteStorageLocation {
- remoteLocation := &filer_pb.RemoteStorageLocation{
+func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, fp util.FullPath) *remote_pb.RemoteStorageLocation {
+ remoteLocation := &remote_pb.RemoteStorageLocation{
Name: remoteMountedLocation.Name,
Bucket: remoteMountedLocation.Bucket,
Path: remoteMountedLocation.Path,
@@ -34,11 +21,11 @@ func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMou
return remoteLocation
}
-func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, remoteLocationPath string)(fp util.FullPath) {
+func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, remoteLocationPath string) (fp util.FullPath) {
return localMountedDir.Child(remoteLocationPath[len(remoteMountedLocation.Path):])
}
-func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *filer_pb.RemoteConf, remoteLocation *filer_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error {
+func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error {
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
_, err := client.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{
Directory: string(parent),
diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go
new file mode 100644
index 000000000..fb74dca98
--- /dev/null
+++ b/weed/filer/remote_mapping.go
@@ -0,0 +1,121 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+)
+
+func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
+ if readErr != nil {
+ return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+
+ return
+}
+
+func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error) {
+
+ // read current mapping
+ var oldContent, newContent []byte
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return err
+ })
+ if err != nil {
+ if err != filer_pb.ErrNotFound {
+ return fmt.Errorf("read existing mapping: %v", err)
+ }
+ }
+
+ // add new mapping
+ newContent, err = addRemoteStorageMapping(oldContent, dir, remoteStorageLocation)
+ if err != nil {
+ return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err)
+ }
+
+ // save back
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
+ })
+ if err != nil {
+ return fmt.Errorf("save mapping: %v", err)
+ }
+
+ return nil
+}
+
+func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error) {
+
+ // read current mapping
+ var oldContent, newContent []byte
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return err
+ })
+ if err != nil {
+ if err != filer_pb.ErrNotFound {
+ return fmt.Errorf("read existing mapping: %v", err)
+ }
+ }
+
+ // add new mapping
+ newContent, err = removeRemoteStorageMapping(oldContent, dir)
+ if err != nil {
+ return fmt.Errorf("delete mount %s: %v", dir, err)
+ }
+
+ // save back
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
+ })
+ if err != nil {
+ return fmt.Errorf("save mapping: %v", err)
+ }
+
+ return nil
+}
+
+func addRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) {
+ mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
+ if unmarshalErr != nil {
+ // skip
+ }
+
+ // set the new mapping
+ mappings.Mappings[dir] = storageLocation
+
+ if newContent, err = proto.Marshal(mappings); err != nil {
+ return oldContent, fmt.Errorf("marshal mappings: %v", err)
+ }
+
+ return
+}
+
+func removeRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) {
+ mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
+ if unmarshalErr != nil {
+ return nil, unmarshalErr
+ }
+
+ // set the new mapping
+ delete(mappings.Mappings, dir)
+
+ if newContent, err = proto.Marshal(mappings); err != nil {
+ return oldContent, fmt.Errorf("marshal mappings: %v", err)
+ }
+
+ return
+}
diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/remote_storage.go
index 99ea1d3bb..4ff21f3b3 100644
--- a/weed/filer/filer_remote_storage.go
+++ b/weed/filer/remote_storage.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@@ -21,13 +22,13 @@ const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
type FilerRemoteStorage struct {
rules ptrie.Trie
- storageNameToConf map[string]*filer_pb.RemoteConf
+ storageNameToConf map[string]*remote_pb.RemoteConf
}
func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
rs = &FilerRemoteStorage{
rules: ptrie.New(),
- storageNameToConf: make(map[string]*filer_pb.RemoteConf),
+ storageNameToConf: make(map[string]*remote_pb.RemoteConf),
}
return rs
}
@@ -56,7 +57,7 @@ func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *F
if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
return nil
}
- conf := &filer_pb.RemoteConf{}
+ conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(entry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
}
@@ -66,7 +67,7 @@ func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *F
}
func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
- mappings := &filer_pb.RemoteStorageMapping{}
+ mappings := &remote_pb.RemoteStorageMapping{}
if err := proto.Unmarshal(data, mappings); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
}
@@ -76,23 +77,23 @@ func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err er
return nil
}
-func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) {
+func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *remote_pb.RemoteStorageLocation) {
rs.rules.Put([]byte(dir+"/"), loc)
}
-func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) {
+func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation) {
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
mountDir = util.FullPath(string(key[:len(key)-1]))
- remoteLocation = value.(*filer_pb.RemoteStorageLocation)
+ remoteLocation = value.(*remote_pb.RemoteStorageLocation)
return true
})
return
}
-func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
- var storageLocation *filer_pb.RemoteStorageLocation
+func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
+ var storageLocation *remote_pb.RemoteStorageLocation
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
- storageLocation = value.(*filer_pb.RemoteStorageLocation)
+ storageLocation = value.(*remote_pb.RemoteStorageLocation)
return true
})
@@ -104,7 +105,7 @@ func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client r
return rs.GetRemoteStorageClient(storageLocation.Name)
}
-func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
+func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
remoteConf, found = rs.storageNameToConf[storageName]
if !found {
return
@@ -118,9 +119,9 @@ func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client
return
}
-func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) {
- mappings = &filer_pb.RemoteStorageMapping{
- Mappings: make(map[string]*filer_pb.RemoteStorageLocation),
+func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error) {
+ mappings = &remote_pb.RemoteStorageMapping{
+ Mappings: make(map[string]*remote_pb.RemoteStorageLocation),
}
if len(oldContent) > 0 {
if err = proto.Unmarshal(oldContent, mappings); err != nil {
@@ -130,70 +131,51 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.Remot
return
}
-func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) {
- mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
- if unmarshalErr != nil {
- // skip
- }
-
- // set the new mapping
- mappings.Mappings[dir] = storageLocation
-
- if newContent, err = proto.Marshal(mappings); err != nil {
- return oldContent, fmt.Errorf("marshal mappings: %v", err)
- }
-
- return
-}
-
-func RemoveRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) {
- mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
- if unmarshalErr != nil {
- return nil, unmarshalErr
- }
-
- // set the new mapping
- delete(mappings.Mappings, dir)
-
- if newContent, err = proto.Marshal(mappings); err != nil {
- return oldContent, fmt.Errorf("marshal mappings: %v", err)
- }
-
- return
-}
-
-func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
+func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {
return nil, readErr
}
- mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
- if readErr != nil {
- return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
+ // unmarshal storage configuration
+ conf = &remote_pb.RemoteConf{}
+ if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
+ readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
+ return
}
return
}
-func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
- var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
- return readErr
- }); readErr != nil {
- return nil, readErr
+func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress string, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
+
+ mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress)
+ if listErr != nil {
+ return nil, "", nil, nil, listErr
+ }
+ if dir == "" {
+ return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option")
}
- // unmarshal storage configuration
- conf = &filer_pb.RemoteConf{}
- if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
- readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
- return
+ var localMountedDir string
+ var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
+ for k, loc := range mappings.Mappings {
+ if strings.HasPrefix(dir, k) {
+ localMountedDir, remoteStorageMountedLocation = k, loc
+ }
+ }
+ if localMountedDir == "" {
+ return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir)
}
- return
+ // find remote storage configuration
+ remoteStorageConf, err := ReadRemoteStorageConf(grpcDialOption, filerAddress, remoteStorageMountedLocation.Name)
+ if err != nil {
+ return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err
+ }
+
+ return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil
}
diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/remote_storage_test.go
index 35ffc7538..9f4d7af2f 100644
--- a/weed/filer/filer_remote_storage_test.go
+++ b/weed/filer/remote_storage_test.go
@@ -1,20 +1,20 @@
package filer
import (
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/stretchr/testify/assert"
"testing"
)
func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) {
- conf := &filer_pb.RemoteConf{
+ conf := &remote_pb.RemoteConf{
Name: "s7",
Type: "s3",
}
rs := NewFilerRemoteStorage()
rs.storageNameToConf[conf.Name] = conf
- rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{
+ rs.mapDirectoryToRemoteStorage("/a/b/c", &remote_pb.RemoteStorageLocation{
Name: "s7",
Bucket: "some",
Path: "/dir",
diff --git a/weed/filer/sqlite/sqlite_store.go b/weed/filer/sqlite/sqlite_store.go
index 6b055e53c..ca9d38786 100644
--- a/weed/filer/sqlite/sqlite_store.go
+++ b/weed/filer/sqlite/sqlite_store.go
@@ -1,3 +1,4 @@
+//go:build linux || darwin || windows
// +build linux darwin windows
// limited GOOS due to modernc.org/libc/unistd
diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go
index 803c71afa..0fba1ea33 100644
--- a/weed/filer/sqlite/sqlite_store_unsupported.go
+++ b/weed/filer/sqlite/sqlite_store_unsupported.go
@@ -1,3 +1,4 @@
+//go:build !linux && !darwin && !windows && !s390 && !ppc64le && !mips64
// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64
// limited GOOS due to modernc.org/libc/unistd
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index c61ee3c12..4a70d118e 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -7,6 +7,7 @@ import (
"math"
"sort"
"strings"
+ "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -16,6 +17,49 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
+func HasData(entry *filer_pb.Entry) bool {
+
+ if len(entry.Content) > 0 {
+ return true
+ }
+
+ return len(entry.Chunks) > 0
+}
+
+func IsSameData(a, b *filer_pb.Entry) bool {
+
+ if len(a.Content) > 0 || len(b.Content) > 0 {
+ return bytes.Equal(a.Content, b.Content)
+ }
+
+ return isSameChunks(a.Chunks, b.Chunks)
+}
+
+func isSameChunks(a, b []*filer_pb.FileChunk) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ sort.Slice(a, func(i, j int) bool {
+ return strings.Compare(a[i].ETag, a[j].ETag) < 0
+ })
+ sort.Slice(b, func(i, j int) bool {
+ return strings.Compare(b[i].ETag, b[j].ETag) < 0
+ })
+ for i := 0; i < len(a); i++ {
+ if a[i].ETag != b[i].ETag {
+ return false
+ }
+ }
+ return true
+}
+
+func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
+ if len(entry.Content) > 0 {
+ return bytes.NewReader(entry.Content)
+ }
+ return NewChunkStreamReader(filerClient, entry.Chunks)
+}
+
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
@@ -83,14 +127,14 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
// ---------------- ChunkStreamReader ----------------------------------
type ChunkStreamReader struct {
- chunkViews []*ChunkView
- totalSize int64
- logicOffset int64
- buffer []byte
- bufferOffset int64
- bufferPos int
- nextChunkViewIndex int
- lookupFileId wdclient.LookupFileIdFunctionType
+ chunkViews []*ChunkView
+ totalSize int64
+ logicOffset int64
+ buffer []byte
+ bufferOffset int64
+ bufferLock sync.Mutex
+ chunk string
+ lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
@@ -132,26 +176,29 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F
}
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
- if err = c.prepareBufferFor(c.logicOffset); err != nil {
+ c.bufferLock.Lock()
+ defer c.bufferLock.Unlock()
+ if err = c.prepareBufferFor(off); err != nil {
return
}
- return c.Read(p)
+ c.logicOffset = off
+ return c.doRead(p)
}
func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
+ c.bufferLock.Lock()
+ defer c.bufferLock.Unlock()
+ return c.doRead(p)
+}
+
+func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
+ // fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
for n < len(p) {
- if c.isBufferEmpty() {
- if c.nextChunkViewIndex >= len(c.chunkViews) {
- return n, io.EOF
- }
- chunkView := c.chunkViews[c.nextChunkViewIndex]
- if err = c.fetchChunkToBuffer(chunkView); err != nil {
- return
- }
- c.nextChunkViewIndex++
+ // println("read", c.logicOffset)
+ if err = c.prepareBufferFor(c.logicOffset); err != nil {
+ return
}
- t := copy(p[n:], c.buffer[c.bufferPos:])
- c.bufferPos += t
+ t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:])
n += t
c.logicOffset += int64(t)
}
@@ -159,10 +206,12 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
}
func (c *ChunkStreamReader) isBufferEmpty() bool {
- return len(c.buffer) <= c.bufferPos
+ return len(c.buffer) <= int(c.logicOffset-c.bufferOffset)
}
func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
+ c.bufferLock.Lock()
+ defer c.bufferLock.Unlock()
var err error
switch whence {
@@ -182,33 +231,59 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
}
+func insideChunk(offset int64, chunk *ChunkView) bool {
+ return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
+}
+
func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
// stay in the same chunk
- if !c.isBufferEmpty() {
- if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
- c.bufferPos = int(offset - c.bufferOffset)
- return nil
- }
+ if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
+ return nil
}
+ // fmt.Printf("fetch for offset %d\n", offset)
+
// need to seek to a different chunk
currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
- return c.chunkViews[i].LogicOffset <= offset
+ return offset < c.chunkViews[i].LogicOffset
})
if currentChunkIndex == len(c.chunkViews) {
- return io.EOF
+ // not found
+ if insideChunk(offset, c.chunkViews[0]) {
+ // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
+ currentChunkIndex = 0
+ } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
+ currentChunkIndex = len(c.chunkViews) - 1
+ // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
+ } else {
+ return io.EOF
+ }
+ } else if currentChunkIndex > 0 {
+ if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
+ // good hit
+ } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
+ currentChunkIndex -= 1
+ // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
+ } else {
+ // glog.Fatalf("unexpected1 offset %d", offset)
+ return fmt.Errorf("unexpected1 offset %d", offset)
+ }
+ } else {
+ // glog.Fatalf("unexpected2 offset %d", offset)
+ return fmt.Errorf("unexpected2 offset %d", offset)
}
// positioning within the new chunk
chunk := c.chunkViews[currentChunkIndex]
- if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
+ if insideChunk(offset, chunk) {
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
if err = c.fetchChunkToBuffer(chunk); err != nil {
return
}
- c.nextChunkViewIndex = currentChunkIndex + 1
}
- c.bufferPos = int(offset - c.bufferOffset)
+ } else {
+ // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
+ return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
}
return
}
@@ -239,10 +314,10 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
c.buffer = buffer.Bytes()
- c.bufferPos = 0
c.bufferOffset = chunkView.LogicOffset
+ c.chunk = chunkView.FileId
- // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+ // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
return nil
}
diff --git a/weed/filer/tikv/tikv.go b/weed/filer/tikv/tikv.go
new file mode 100644
index 000000000..8bb5dc577
--- /dev/null
+++ b/weed/filer/tikv/tikv.go
@@ -0,0 +1,5 @@
+package tikv
+
+/*
+ * This empty file is let go build can work without tikv tag
+ */
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
new file mode 100644
index 000000000..561f23910
--- /dev/null
+++ b/weed/filer/tikv/tikv_store.go
@@ -0,0 +1,389 @@
+//go:build tikv
+// +build tikv
+
+package tikv
+
+import (
+ "bytes"
+ "context"
+ "crypto/sha1"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/tikv/client-go/v2/tikv"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+var (
+ _ filer.FilerStore = ((*TikvStore)(nil))
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &TikvStore{})
+}
+
+type TikvStore struct {
+ client *tikv.KVStore
+ deleteRangeConcurrency int
+}
+
+// Basic APIs
+func (store *TikvStore) GetName() string {
+ return "tikv"
+}
+
+func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
+ pdAddrs := []string{}
+ pdAddrsStr := config.GetString(prefix + "pdaddrs")
+ for _, item := range strings.Split(pdAddrsStr, ",") {
+ pdAddrs = append(pdAddrs, strings.TrimSpace(item))
+ }
+ drc := config.GetInt(prefix + "deleterange_concurrency")
+ if drc <= 0 {
+ drc = 1
+ }
+ store.deleteRangeConcurrency = drc
+ return store.initialize(pdAddrs)
+}
+
+func (store *TikvStore) initialize(pdAddrs []string) error {
+ client, err := tikv.NewTxnClient(pdAddrs)
+ store.client = client
+ return err
+}
+
+func (store *TikvStore) Shutdown() {
+ err := store.client.Close()
+ if err != nil {
+ glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
+ }
+}
+
+// ~ Basic APIs
+
+// Entry APIs
+func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ dir, name := entry.DirAndName()
+ key := generateKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+ return nil
+}
+
+func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var value []byte = nil
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ value = val
+ }
+ return err
+ })
+
+ if isNotExists(err) || value == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", path, err)
+ }
+
+ entry := &filer.Entry{
+ FullPath: path,
+ }
+ err = entry.DecodeAttributesAndChunks(value)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+ return entry, nil
+}
+
+func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ return nil
+}
+
+// ~ Entry APIs
+
+// Directory APIs
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
+ directoryPrefix := genDirectoryKeyPrefix(path, "")
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ var (
+ startKey []byte = nil
+ endKey []byte = nil
+ )
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(directoryPrefix, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ endKey = key
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ if startKey == nil {
+ startKey = key
+ }
+
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ // Only one Key matched just delete it.
+ if startKey != nil && bytes.Equal(startKey, endKey) {
+ return txn.Delete(startKey)
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+
+ if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
+ // has startKey and endKey and they are not equals, so use delete range
+ _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ }
+ return err
+}
+
+func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ lastFileName := ""
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return lastFileName, err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(lastFileStart, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ i := int64(0)
+ first := true
+ for iter.Valid() {
+ if first {
+ first = false
+ if !includeStartFile {
+ if iter.Valid() {
+ // Check first item is lastFileStart
+ if bytes.Equal(iter.Key(), lastFileStart) {
+ // Is lastFileStart and not include start file, just
+ // ignore it.
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ continue
+ }
+ }
+ }
+ }
+ // Check for limitation
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+ // Validate key prefix
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ value := iter.Value()
+
+ // Start process
+ fileName := getNameFromKey(key)
+ if fileName != "" {
+ // Got file name, then generate the Entry
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(dirPath), fileName),
+ }
+ // Update lastFileName
+ lastFileName = fileName
+ // Check for decode value.
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
+ // Got error just return the error
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return err
+ }
+ // Run for each callback if return false just break the iteration
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ // End process
+
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ }
+ return lastFileName, nil
+}
+
+// ~ Directory APIs
+
+// Transaction Related APIs
+func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.client.Begin()
+ if err != nil {
+ return ctx, err
+ }
+ return context.WithValue(ctx, "tx", tx), nil
+}
+
+func (store *TikvStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Commit(context.Background())
+ }
+ return nil
+}
+
+func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+// ~ Transaction Related APIs
+
+// Transaction Wrapper
+type TxnWrapper struct {
+ *txnkv.KVTxn
+ inContext bool
+}
+
+func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
+ err := f(w.KVTxn)
+ if !w.inContext {
+ if err != nil {
+ w.KVTxn.Rollback()
+ return err
+ }
+ w.KVTxn.Commit(context.Background())
+ return nil
+ }
+ return err
+}
+
+func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return &TxnWrapper{tx, true}, nil
+ }
+ txn, err := store.client.Begin()
+ if err != nil {
+ return nil, err
+ }
+ return &TxnWrapper{txn, false}, nil
+}
+
+// ~ Transaction Wrapper
+
+// Encoding Functions
+func hashToBytes(dir string) []byte {
+ h := sha1.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return b
+}
+
+func generateKey(dirPath, fileName string) []byte {
+ key := hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func getNameFromKey(key []byte) string {
+ return string(key[sha1.Size:])
+}
+
+func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func isNotExists(err error) bool {
+ if err == nil {
+ return false
+ }
+ if err.Error() == "not exist" {
+ return true
+ }
+ return false
+}
+
+// ~ Encoding Functions
diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go
new file mode 100644
index 000000000..1d9428c69
--- /dev/null
+++ b/weed/filer/tikv/tikv_store_kv.go
@@ -0,0 +1,50 @@
+//go:build tikv
+// +build tikv
+
+package tikv
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+}
+
+func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var data []byte = nil
+ err = tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ data = val
+ }
+ return err
+ })
+ if isNotExists(err) {
+ return data, filer.ErrKvNotFound
+ }
+ return data, err
+}
+
+func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+}