diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2021-09-13 10:34:33 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-13 10:34:33 +0800 |
| commit | 1de733fda507e1da94b2e4741c74ba7e5e2c5f76 (patch) | |
| tree | aed7ac29e27e0f8def942154603375396fae9489 /weed/filer | |
| parent | 27c05f8c0b5c7bda43babeb61d79684d11851111 (diff) | |
| parent | 7591336a2269c1ad92266280634bcaea34f7a5d1 (diff) | |
| download | seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.tar.xz seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.zip | |
Merge pull request #81 from chrislusf/master
sync
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/etcd/etcd_store.go | 2 | ||||
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 5 | ||||
| -rw-r--r-- | weed/filer/filer_notify_append.go | 11 | ||||
| -rw-r--r-- | weed/filer/leveldb/leveldb_store.go | 7 | ||||
| -rw-r--r-- | weed/filer/leveldb2/leveldb2_store.go | 7 | ||||
| -rw-r--r-- | weed/filer/leveldb3/leveldb3_store.go | 14 | ||||
| -rw-r--r-- | weed/filer/read_remote.go | 23 | ||||
| -rw-r--r-- | weed/filer/remote_mapping.go | 121 | ||||
| -rw-r--r-- | weed/filer/remote_storage.go (renamed from weed/filer/filer_remote_storage.go) | 110 | ||||
| -rw-r--r-- | weed/filer/remote_storage_test.go (renamed from weed/filer/filer_remote_storage_test.go) | 6 | ||||
| -rw-r--r-- | weed/filer/sqlite/sqlite_store.go | 1 | ||||
| -rw-r--r-- | weed/filer/sqlite/sqlite_store_unsupported.go | 1 | ||||
| -rw-r--r-- | weed/filer/stream.go | 143 | ||||
| -rw-r--r-- | weed/filer/tikv/tikv.go | 5 | ||||
| -rw-r--r-- | weed/filer/tikv/tikv_store.go | 389 | ||||
| -rw-r--r-- | weed/filer/tikv/tikv_store_kv.go | 50 |
16 files changed, 759 insertions, 136 deletions
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 71ed738f9..2a5dfc926 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/client/v3" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 00dbf1fd6..32008271b 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -6,6 +6,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "math" + "net/url" + "strings" "time" "github.com/golang/protobuf/proto" @@ -108,6 +110,9 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { receivedData = receivedData[:0] + if strings.Contains(urlString, "%") { + urlString = url.PathEscape(urlString) + } shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { receivedData = append(receivedData, data...) }) diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index d441bbbc9..73762cde7 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -66,7 +66,16 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi // upload data targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth) + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: "", + Cipher: f.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: assignResult.Auth, + } + uploadResult, err := operation.UploadData(data, uploadOption) if err != nil { return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) } diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index ce454f36a..9a7670d42 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/syndtr/goleveldb/leveldb" leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" "os" @@ -45,9 +46,9 @@ func (store *LevelDBStore) initialize(dir string) (err error) { } opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 10, + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + Filter: filter.NewBloomFilter(8), // false positive rate 0.02 } if store.db, err = leveldb.OpenFile(dir, opts); err != nil { diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 4c4409c4d..966686ed9 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -46,10 +46,9 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { } opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - Filter: filter.NewBloomFilter(8), // false positive rate 0.02 + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + Filter: filter.NewBloomFilter(8), // false positive rate 0.02 } for d := 0; d < dbCount; d++ { diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index bc57a6605..86e2b584b 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -66,17 +66,15 @@ func (store *LevelDB3Store) initialize(dir string) (err error) { func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) { bloom := filter.NewBloomFilter(8) // false positive rate 0.02 opts := &opt.Options{ - BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - Filter: bloom, + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + Filter: bloom, } if name != DEFAULT { opts = &opt.Options{ - BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 4, - Filter: bloom, + BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB + Filter: bloom, } } diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go index eee536ff6..58639024b 100644 --- a/weed/filer/read_remote.go +++ b/weed/filer/read_remote.go @@ -2,8 +2,8 @@ package filer import ( "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -11,21 +11,8 @@ func (entry *Entry) IsInRemoteOnly() bool { return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.RemoteSize > 0 } -func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data []byte, err error) { - client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName) - if !found { - return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName) - } - - mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath) - - sourceLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoation, entry.FullPath) - - return client.ReadFile(sourceLoc, offset, size) -} - -func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, fp util.FullPath) *filer_pb.RemoteStorageLocation { - remoteLocation := &filer_pb.RemoteStorageLocation{ +func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, fp util.FullPath) *remote_pb.RemoteStorageLocation { + remoteLocation := &remote_pb.RemoteStorageLocation{ Name: remoteMountedLocation.Name, Bucket: remoteMountedLocation.Bucket, Path: remoteMountedLocation.Path, @@ -34,11 +21,11 @@ func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMou return remoteLocation } -func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, remoteLocationPath string)(fp util.FullPath) { +func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, remoteLocationPath string) (fp util.FullPath) { return localMountedDir.Child(remoteLocationPath[len(remoteMountedLocation.Path):]) } -func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *filer_pb.RemoteConf, remoteLocation *filer_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { +func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { _, err := client.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{ Directory: string(parent), diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go new file mode 100644 index 000000000..fb74dca98 --- /dev/null +++ b/weed/filer/remote_mapping.go @@ -0,0 +1,121 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" +) + +func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *remote_pb.RemoteStorageMapping, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return readErr + }); readErr != nil { + return nil, readErr + } + + mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) + if readErr != nil { + return nil, fmt.Errorf("unmarshal mappings: %v", readErr) + } + + return +} + +func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error) { + + // read current mapping + var oldContent, newContent []byte + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return err + }) + if err != nil { + if err != filer_pb.ErrNotFound { + return fmt.Errorf("read existing mapping: %v", err) + } + } + + // add new mapping + newContent, err = addRemoteStorageMapping(oldContent, dir, remoteStorageLocation) + if err != nil { + return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err) + } + + // save back + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) + }) + if err != nil { + return fmt.Errorf("save mapping: %v", err) + } + + return nil +} + +func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error) { + + // read current mapping + var oldContent, newContent []byte + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return err + }) + if err != nil { + if err != filer_pb.ErrNotFound { + return fmt.Errorf("read existing mapping: %v", err) + } + } + + // add new mapping + newContent, err = removeRemoteStorageMapping(oldContent, dir) + if err != nil { + return fmt.Errorf("delete mount %s: %v", dir, err) + } + + // save back + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) + }) + if err != nil { + return fmt.Errorf("save mapping: %v", err) + } + + return nil +} + +func addRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + // skip + } + + // set the new mapping + mappings.Mappings[dir] = storageLocation + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} + +func removeRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + return nil, unmarshalErr + } + + // set the new mapping + delete(mappings.Mappings, dir) + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/remote_storage.go index 99ea1d3bb..4ff21f3b3 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/remote_storage.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" @@ -21,13 +22,13 @@ const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping" type FilerRemoteStorage struct { rules ptrie.Trie - storageNameToConf map[string]*filer_pb.RemoteConf + storageNameToConf map[string]*remote_pb.RemoteConf } func NewFilerRemoteStorage() (rs *FilerRemoteStorage) { rs = &FilerRemoteStorage{ rules: ptrie.New(), - storageNameToConf: make(map[string]*filer_pb.RemoteConf), + storageNameToConf: make(map[string]*remote_pb.RemoteConf), } return rs } @@ -56,7 +57,7 @@ func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *F if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) { return nil } - conf := &filer_pb.RemoteConf{} + conf := &remote_pb.RemoteConf{} if err := proto.Unmarshal(entry.Content, conf); err != nil { return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err) } @@ -66,7 +67,7 @@ func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *F } func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) { - mappings := &filer_pb.RemoteStorageMapping{} + mappings := &remote_pb.RemoteStorageMapping{} if err := proto.Unmarshal(data, mappings); err != nil { return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err) } @@ -76,23 +77,23 @@ func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err er return nil } -func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) { +func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *remote_pb.RemoteStorageLocation) { rs.rules.Put([]byte(dir+"/"), loc) } -func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) { +func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation) { rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { mountDir = util.FullPath(string(key[:len(key)-1])) - remoteLocation = value.(*filer_pb.RemoteStorageLocation) + remoteLocation = value.(*remote_pb.RemoteStorageLocation) return true }) return } -func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { - var storageLocation *filer_pb.RemoteStorageLocation +func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) { + var storageLocation *remote_pb.RemoteStorageLocation rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { - storageLocation = value.(*filer_pb.RemoteStorageLocation) + storageLocation = value.(*remote_pb.RemoteStorageLocation) return true }) @@ -104,7 +105,7 @@ func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client r return rs.GetRemoteStorageClient(storageLocation.Name) } -func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { +func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) { remoteConf, found = rs.storageNameToConf[storageName] if !found { return @@ -118,9 +119,9 @@ func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client return } -func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) { - mappings = &filer_pb.RemoteStorageMapping{ - Mappings: make(map[string]*filer_pb.RemoteStorageLocation), +func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error) { + mappings = &remote_pb.RemoteStorageMapping{ + Mappings: make(map[string]*remote_pb.RemoteStorageLocation), } if len(oldContent) > 0 { if err = proto.Unmarshal(oldContent, mappings); err != nil { @@ -130,70 +131,51 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.Remot return } -func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) { - mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) - if unmarshalErr != nil { - // skip - } - - // set the new mapping - mappings.Mappings[dir] = storageLocation - - if newContent, err = proto.Marshal(mappings); err != nil { - return oldContent, fmt.Errorf("marshal mappings: %v", err) - } - - return -} - -func RemoveRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) { - mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) - if unmarshalErr != nil { - return nil, unmarshalErr - } - - // set the new mapping - delete(mappings.Mappings, dir) - - if newContent, err = proto.Marshal(mappings); err != nil { - return oldContent, fmt.Errorf("marshal mappings: %v", err) - } - - return -} - -func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) { +func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *remote_pb.RemoteConf, readErr error) { var oldContent []byte if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) return readErr }); readErr != nil { return nil, readErr } - mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) - if readErr != nil { - return nil, fmt.Errorf("unmarshal mappings: %v", readErr) + // unmarshal storage configuration + conf = &remote_pb.RemoteConf{} + if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil { + readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) + return } return } -func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) { - var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) - return readErr - }); readErr != nil { - return nil, readErr +func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress string, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) { + + mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress) + if listErr != nil { + return nil, "", nil, nil, listErr + } + if dir == "" { + return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option") } - // unmarshal storage configuration - conf = &filer_pb.RemoteConf{} - if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil { - readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) - return + var localMountedDir string + var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation + for k, loc := range mappings.Mappings { + if strings.HasPrefix(dir, k) { + localMountedDir, remoteStorageMountedLocation = k, loc + } + } + if localMountedDir == "" { + return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir) } - return + // find remote storage configuration + remoteStorageConf, err := ReadRemoteStorageConf(grpcDialOption, filerAddress, remoteStorageMountedLocation.Name) + if err != nil { + return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err + } + + return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil } diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/remote_storage_test.go index 35ffc7538..9f4d7af2f 100644 --- a/weed/filer/filer_remote_storage_test.go +++ b/weed/filer/remote_storage_test.go @@ -1,20 +1,20 @@ package filer import ( - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/stretchr/testify/assert" "testing" ) func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { - conf := &filer_pb.RemoteConf{ + conf := &remote_pb.RemoteConf{ Name: "s7", Type: "s3", } rs := NewFilerRemoteStorage() rs.storageNameToConf[conf.Name] = conf - rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{ + rs.mapDirectoryToRemoteStorage("/a/b/c", &remote_pb.RemoteStorageLocation{ Name: "s7", Bucket: "some", Path: "/dir", diff --git a/weed/filer/sqlite/sqlite_store.go b/weed/filer/sqlite/sqlite_store.go index 6b055e53c..ca9d38786 100644 --- a/weed/filer/sqlite/sqlite_store.go +++ b/weed/filer/sqlite/sqlite_store.go @@ -1,3 +1,4 @@ +//go:build linux || darwin || windows // +build linux darwin windows // limited GOOS due to modernc.org/libc/unistd diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go index 803c71afa..0fba1ea33 100644 --- a/weed/filer/sqlite/sqlite_store_unsupported.go +++ b/weed/filer/sqlite/sqlite_store_unsupported.go @@ -1,3 +1,4 @@ +//go:build !linux && !darwin && !windows && !s390 && !ppc64le && !mips64 // +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64 // limited GOOS due to modernc.org/libc/unistd diff --git a/weed/filer/stream.go b/weed/filer/stream.go index c61ee3c12..4a70d118e 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -7,6 +7,7 @@ import ( "math" "sort" "strings" + "sync" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -16,6 +17,49 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) +func HasData(entry *filer_pb.Entry) bool { + + if len(entry.Content) > 0 { + return true + } + + return len(entry.Chunks) > 0 +} + +func IsSameData(a, b *filer_pb.Entry) bool { + + if len(a.Content) > 0 || len(b.Content) > 0 { + return bytes.Equal(a.Content, b.Content) + } + + return isSameChunks(a.Chunks, b.Chunks) +} + +func isSameChunks(a, b []*filer_pb.FileChunk) bool { + if len(a) != len(b) { + return false + } + sort.Slice(a, func(i, j int) bool { + return strings.Compare(a[i].ETag, a[j].ETag) < 0 + }) + sort.Slice(b, func(i, j int) bool { + return strings.Compare(b[i].ETag, b[j].ETag) < 0 + }) + for i := 0; i < len(a); i++ { + if a[i].ETag != b[i].ETag { + return false + } + } + return true +} + +func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader { + if len(entry.Content) > 0 { + return bytes.NewReader(entry.Content) + } + return NewChunkStreamReader(filerClient, entry.Chunks) +} + func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) @@ -83,14 +127,14 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - totalSize int64 - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferPos int - nextChunkViewIndex int - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + logicOffset int64 + buffer []byte + bufferOffset int64 + bufferLock sync.Mutex + chunk string + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) @@ -132,26 +176,29 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F } func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { - if err = c.prepareBufferFor(c.logicOffset); err != nil { + c.bufferLock.Lock() + defer c.bufferLock.Unlock() + if err = c.prepareBufferFor(off); err != nil { return } - return c.Read(p) + c.logicOffset = off + return c.doRead(p) } func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { + c.bufferLock.Lock() + defer c.bufferLock.Unlock() + return c.doRead(p) +} + +func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) { + // fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer))) for n < len(p) { - if c.isBufferEmpty() { - if c.nextChunkViewIndex >= len(c.chunkViews) { - return n, io.EOF - } - chunkView := c.chunkViews[c.nextChunkViewIndex] - if err = c.fetchChunkToBuffer(chunkView); err != nil { - return - } - c.nextChunkViewIndex++ + // println("read", c.logicOffset) + if err = c.prepareBufferFor(c.logicOffset); err != nil { + return } - t := copy(p[n:], c.buffer[c.bufferPos:]) - c.bufferPos += t + t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:]) n += t c.logicOffset += int64(t) } @@ -159,10 +206,12 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { } func (c *ChunkStreamReader) isBufferEmpty() bool { - return len(c.buffer) <= c.bufferPos + return len(c.buffer) <= int(c.logicOffset-c.bufferOffset) } func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { + c.bufferLock.Lock() + defer c.bufferLock.Unlock() var err error switch whence { @@ -182,33 +231,59 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { } +func insideChunk(offset int64, chunk *ChunkView) bool { + return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) +} + func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { // stay in the same chunk - if !c.isBufferEmpty() { - if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { - c.bufferPos = int(offset - c.bufferOffset) - return nil - } + if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { + return nil } + // fmt.Printf("fetch for offset %d\n", offset) + // need to seek to a different chunk currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { - return c.chunkViews[i].LogicOffset <= offset + return offset < c.chunkViews[i].LogicOffset }) if currentChunkIndex == len(c.chunkViews) { - return io.EOF + // not found + if insideChunk(offset, c.chunkViews[0]) { + // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) + currentChunkIndex = 0 + } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) { + currentChunkIndex = len(c.chunkViews) - 1 + // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) + } else { + return io.EOF + } + } else if currentChunkIndex > 0 { + if insideChunk(offset, c.chunkViews[currentChunkIndex]) { + // good hit + } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) { + currentChunkIndex -= 1 + // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) + } else { + // glog.Fatalf("unexpected1 offset %d", offset) + return fmt.Errorf("unexpected1 offset %d", offset) + } + } else { + // glog.Fatalf("unexpected2 offset %d", offset) + return fmt.Errorf("unexpected2 offset %d", offset) } // positioning within the new chunk chunk := c.chunkViews[currentChunkIndex] - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if insideChunk(offset, chunk) { if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { if err = c.fetchChunkToBuffer(chunk); err != nil { return } - c.nextChunkViewIndex = currentChunkIndex + 1 } - c.bufferPos = int(offset - c.bufferOffset) + } else { + // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) + return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) } return } @@ -239,10 +314,10 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return err } c.buffer = buffer.Bytes() - c.bufferPos = 0 c.bufferOffset = chunkView.LogicOffset + c.chunk = chunkView.FileId - // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) return nil } diff --git a/weed/filer/tikv/tikv.go b/weed/filer/tikv/tikv.go new file mode 100644 index 000000000..8bb5dc577 --- /dev/null +++ b/weed/filer/tikv/tikv.go @@ -0,0 +1,5 @@ +package tikv + +/* + * This empty file is let go build can work without tikv tag + */ diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go new file mode 100644 index 000000000..561f23910 --- /dev/null +++ b/weed/filer/tikv/tikv_store.go @@ -0,0 +1,389 @@ +//go:build tikv +// +build tikv + +package tikv + +import ( + "bytes" + "context" + "crypto/sha1" + "fmt" + "io" + "strings" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv" +) + +var ( + _ filer.FilerStore = ((*TikvStore)(nil)) +) + +func init() { + filer.Stores = append(filer.Stores, &TikvStore{}) +} + +type TikvStore struct { + client *tikv.KVStore + deleteRangeConcurrency int +} + +// Basic APIs +func (store *TikvStore) GetName() string { + return "tikv" +} + +func (store *TikvStore) Initialize(config util.Configuration, prefix string) error { + pdAddrs := []string{} + pdAddrsStr := config.GetString(prefix + "pdaddrs") + for _, item := range strings.Split(pdAddrsStr, ",") { + pdAddrs = append(pdAddrs, strings.TrimSpace(item)) + } + drc := config.GetInt(prefix + "deleterange_concurrency") + if drc <= 0 { + drc = 1 + } + store.deleteRangeConcurrency = drc + return store.initialize(pdAddrs) +} + +func (store *TikvStore) initialize(pdAddrs []string) error { + client, err := tikv.NewTxnClient(pdAddrs) + store.client = client + return err +} + +func (store *TikvStore) Shutdown() { + err := store.client.Close() + if err != nil { + glog.V(0).Infof("Shutdown TiKV client got error: %v", err) + } +} + +// ~ Basic APIs + +// Entry APIs +func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { + dir, name := entry.DirAndName() + key := generateKey(dir, name) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Set(key, value) + }) + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + return nil +} + +func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error { + return store.InsertEntry(ctx, entry) +} + +func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) { + dir, name := path.DirAndName() + key := generateKey(dir, name) + + txn, err := store.getTxn(ctx) + if err != nil { + return nil, err + } + var value []byte = nil + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + val, err := txn.Get(context.TODO(), key) + if err == nil { + value = val + } + return err + }) + + if isNotExists(err) || value == nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", path, err) + } + + entry := &filer.Entry{ + FullPath: path, + } + err = entry.DecodeAttributesAndChunks(value) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + return entry, nil +} + +func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error { + dir, name := path.DirAndName() + key := generateKey(dir, name) + + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Delete(key) + }) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + return nil +} + +// ~ Entry APIs + +// Directory APIs +func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { + directoryPrefix := genDirectoryKeyPrefix(path, "") + + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + var ( + startKey []byte = nil + endKey []byte = nil + ) + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + iter, err := txn.Iter(directoryPrefix, nil) + if err != nil { + return err + } + defer iter.Close() + for iter.Valid() { + key := iter.Key() + endKey = key + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + if startKey == nil { + startKey = key + } + + err = iter.Next() + if err != nil { + return err + } + } + // Only one Key matched just delete it. + if startKey != nil && bytes.Equal(startKey, endKey) { + return txn.Delete(startKey) + } + return nil + }) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + + if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) { + // has startKey and endKey and they are not equals, so use delete range + _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + } + return err +} + +func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + lastFileName := "" + directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) + lastFileStart := directoryPrefix + if startFileName != "" { + lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) + } + + txn, err := store.getTxn(ctx) + if err != nil { + return lastFileName, err + } + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + iter, err := txn.Iter(lastFileStart, nil) + if err != nil { + return err + } + defer iter.Close() + i := int64(0) + first := true + for iter.Valid() { + if first { + first = false + if !includeStartFile { + if iter.Valid() { + // Check first item is lastFileStart + if bytes.Equal(iter.Key(), lastFileStart) { + // Is lastFileStart and not include start file, just + // ignore it. + err = iter.Next() + if err != nil { + return err + } + continue + } + } + } + } + // Check for limitation + if limit > 0 { + i++ + if i > limit { + break + } + } + // Validate key prefix + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + value := iter.Value() + + // Start process + fileName := getNameFromKey(key) + if fileName != "" { + // Got file name, then generate the Entry + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), fileName), + } + // Update lastFileName + lastFileName = fileName + // Check for decode value. + if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil { + // Got error just return the error + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + return err + } + // Run for each callback if return false just break the iteration + if !eachEntryFunc(entry) { + break + } + } + // End process + + err = iter.Next() + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) + } + return lastFileName, nil +} + +// ~ Directory APIs + +// Transaction Related APIs +func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { + tx, err := store.client.Begin() + if err != nil { + return ctx, err + } + return context.WithValue(ctx, "tx", tx), nil +} + +func (store *TikvStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return tx.Commit(context.Background()) + } + return nil +} + +func (store *TikvStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return tx.Rollback() + } + return nil +} + +// ~ Transaction Related APIs + +// Transaction Wrapper +type TxnWrapper struct { + *txnkv.KVTxn + inContext bool +} + +func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { + err := f(w.KVTxn) + if !w.inContext { + if err != nil { + w.KVTxn.Rollback() + return err + } + w.KVTxn.Commit(context.Background()) + return nil + } + return err +} + +func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return &TxnWrapper{tx, true}, nil + } + txn, err := store.client.Begin() + if err != nil { + return nil, err + } + return &TxnWrapper{txn, false}, nil +} + +// ~ Transaction Wrapper + +// Encoding Functions +func hashToBytes(dir string) []byte { + h := sha1.New() + io.WriteString(h, dir) + b := h.Sum(nil) + return b +} + +func generateKey(dirPath, fileName string) []byte { + key := hashToBytes(dirPath) + key = append(key, []byte(fileName)...) + return key +} + +func getNameFromKey(key []byte) string { + return string(key[sha1.Size:]) +} + +func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = hashToBytes(string(fullpath)) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func isNotExists(err error) bool { + if err == nil { + return false + } + if err.Error() == "not exist" { + return true + } + return false +} + +// ~ Encoding Functions diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go new file mode 100644 index 000000000..1d9428c69 --- /dev/null +++ b/weed/filer/tikv/tikv_store_kv.go @@ -0,0 +1,50 @@ +//go:build tikv +// +build tikv + +package tikv + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/tikv/client-go/v2/txnkv" +) + +func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error { + tw, err := store.getTxn(ctx) + if err != nil { + return err + } + return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Set(key, value) + }) +} + +func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { + tw, err := store.getTxn(ctx) + if err != nil { + return nil, err + } + var data []byte = nil + err = tw.RunInTxn(func(txn *txnkv.KVTxn) error { + val, err := txn.Get(context.TODO(), key) + if err == nil { + data = val + } + return err + }) + if isNotExists(err) { + return data, filer.ErrKvNotFound + } + return data, err +} + +func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error { + tw, err := store.getTxn(ctx) + if err != nil { + return err + } + return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Delete(key) + }) +} |
