diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-07-28 22:43:12 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-07-28 22:43:12 -0700 |
| commit | c090d6bb254b7d5666d0158fc8d7d54c10161c11 (patch) | |
| tree | a8b4b31be1f5569a439d5fd363cc1982ea44cc76 /weed/filer | |
| parent | 035b0bae2982921f4de158308103f9e893ee9cc2 (diff) | |
| download | seaweedfs-c090d6bb254b7d5666d0158fc8d7d54c10161c11.tar.xz seaweedfs-c090d6bb254b7d5666d0158fc8d7d54c10161c11.zip | |
add ReadRemote(), add read remote setup when filer starts
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filer.go | 2 | ||||
| -rw-r--r-- | weed/filer/filer_on_meta_event.go | 14 | ||||
| -rw-r--r-- | weed/filer/filer_remote_storage.go | 26 | ||||
| -rw-r--r-- | weed/filer/filer_remote_storage_test.go | 8 | ||||
| -rw-r--r-- | weed/filer/read_remote.go | 27 |
5 files changed, 69 insertions, 8 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index d4c0b4eef..162db175a 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -42,6 +42,7 @@ type Filer struct { MetaAggregator *MetaAggregator Signature int32 FilerConf *FilerConf + RemoteStorage *FilerRemoteStorage } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -51,6 +52,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), + RemoteStorage: NewFilerRemoteStorage(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index c9f75a5ca..32be4f180 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -12,6 +12,7 @@ import ( // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { f.maybeReloadFilerConfiguration(event) + f.maybeReloadRemoteStorageConfigurationAndMapping(event) f.onBucketEvents(event) } @@ -80,3 +81,16 @@ func (f *Filer) LoadFilerConf() { } f.FilerConf = fc } + +//////////////////////////////////// +// load and maintain remote storages +//////////////////////////////////// +func (f *Filer) LoadRemoteStorageConfAndMapping() { + if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil { + glog.Errorf("read remote conf and mapping: %v", err) + return + } +} +func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) { + // FIXME add reloading +} diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go index f6f3adb22..18b2676bc 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/filer_remote_storage.go @@ -31,7 +31,7 @@ func NewFilerRemoteStorage() (rs *FilerRemoteStorage) { return rs } -func (rs *FilerRemoteStorage) loadRemoteStorageConfigurations(filer *Filer) (err error) { +func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) { // execute this on filer entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "") @@ -74,7 +74,21 @@ func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, rem rs.rules.Put([]byte(dir+"/"), remoteStorageName) } -func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, found bool) { +func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation remote_storage.RemoteStorageLocation) { + var storageLocation string + rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { + mountDir = util.FullPath(string(key)) + storageLocation = value.(string) + return true + }) + if storageLocation == "" { + return + } + remoteLocation = remote_storage.RemoteStorageLocation(storageLocation) + return +} + +func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { var storageLocation string rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { storageLocation = value.(string) @@ -87,8 +101,12 @@ func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client r storageName, _, _ := remote_storage.RemoteStorageLocation(storageLocation).NameBucketPath() - remoteConf, ok := rs.storageNameToConf[storageName] - if !ok { + return rs.GetRemoteStorageClient(storageName) +} + +func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { + remoteConf, found = rs.storageNameToConf[storageName] + if !found { return } diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/filer_remote_storage_test.go index 1a41c6e63..e5996475e 100644 --- a/weed/filer/filer_remote_storage_test.go +++ b/weed/filer/filer_remote_storage_test.go @@ -16,15 +16,15 @@ func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { rs.mapDirectoryToRemoteStorage("/a/b/c", "s7") - _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f") + _, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f") assert.Equal(t, true, found, "find storage client") - _, found2 := rs.FindRemoteStorageClient("/a/b") + _, _, found2 := rs.FindRemoteStorageClient("/a/b") assert.Equal(t, false, found2, "should not find storage client") - _, found3 := rs.FindRemoteStorageClient("/a/b/c") + _, _, found3 := rs.FindRemoteStorageClient("/a/b/c") assert.Equal(t, false, found3, "should not find storage client") - _, found4 := rs.FindRemoteStorageClient("/a/b/cc") + _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc") assert.Equal(t, false, found4, "should not find storage client") }
\ No newline at end of file diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go new file mode 100644 index 000000000..57450d6d8 --- /dev/null +++ b/weed/filer/read_remote.go @@ -0,0 +1,27 @@ +package filer + +import ( + "fmt" + "io" +) + +func (entry *Entry) IsRemoteOnly() bool { + return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0 +} + +func (f *Filer) ReadRemote(w io.Writer, entry *Entry, offset int64, size int64) error { + client, _, found := f.RemoteStorage.GetRemoteStorageClient(remoteEntry.Remote.StorageName) + if !found { + return fmt.Errorf("remote storage %v not found", entry.Remote.StorageName) + } + + mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath) + _, bucket, path := remoteLoation.NameBucketPath() + + remoteFullPath := path + string(entry.FullPath[len(mountDir):]) + + client.ReadFile(bucket, remoteFullPath[1:], offset, size, func(w io.Writer) error { + + }) + return nil +} |
