aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-07-28 22:43:12 -0700
committerChris Lu <chris.lu@gmail.com>2021-07-28 22:43:12 -0700
commitc090d6bb254b7d5666d0158fc8d7d54c10161c11 (patch)
treea8b4b31be1f5569a439d5fd363cc1982ea44cc76
parent035b0bae2982921f4de158308103f9e893ee9cc2 (diff)
downloadseaweedfs-c090d6bb254b7d5666d0158fc8d7d54c10161c11.tar.xz
seaweedfs-c090d6bb254b7d5666d0158fc8d7d54c10161c11.zip
add ReadRemote(), add read remote setup when filer starts
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/filer_on_meta_event.go14
-rw-r--r--weed/filer/filer_remote_storage.go26
-rw-r--r--weed/filer/filer_remote_storage_test.go8
-rw-r--r--weed/filer/read_remote.go27
-rw-r--r--weed/remote_storage/remote_storage.go6
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers_read.go15
8 files changed, 86 insertions, 14 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index d4c0b4eef..162db175a 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -42,6 +42,7 @@ type Filer struct {
MetaAggregator *MetaAggregator
Signature int32
FilerConf *FilerConf
+ RemoteStorage *FilerRemoteStorage
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
@@ -51,6 +52,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
+ RemoteStorage: NewFilerRemoteStorage(),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index c9f75a5ca..32be4f180 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -12,6 +12,7 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
f.maybeReloadFilerConfiguration(event)
+ f.maybeReloadRemoteStorageConfigurationAndMapping(event)
f.onBucketEvents(event)
}
@@ -80,3 +81,16 @@ func (f *Filer) LoadFilerConf() {
}
f.FilerConf = fc
}
+
+////////////////////////////////////
+// load and maintain remote storages
+////////////////////////////////////
+func (f *Filer) LoadRemoteStorageConfAndMapping() {
+ if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil {
+ glog.Errorf("read remote conf and mapping: %v", err)
+ return
+ }
+}
+func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) {
+ // FIXME add reloading
+}
diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go
index f6f3adb22..18b2676bc 100644
--- a/weed/filer/filer_remote_storage.go
+++ b/weed/filer/filer_remote_storage.go
@@ -31,7 +31,7 @@ func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
return rs
}
-func (rs *FilerRemoteStorage) loadRemoteStorageConfigurations(filer *Filer) (err error) {
+func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
// execute this on filer
entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "")
@@ -74,7 +74,21 @@ func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, rem
rs.rules.Put([]byte(dir+"/"), remoteStorageName)
}
-func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, found bool) {
+func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation remote_storage.RemoteStorageLocation) {
+ var storageLocation string
+ rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
+ mountDir = util.FullPath(string(key))
+ storageLocation = value.(string)
+ return true
+ })
+ if storageLocation == "" {
+ return
+ }
+ remoteLocation = remote_storage.RemoteStorageLocation(storageLocation)
+ return
+}
+
+func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
var storageLocation string
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
storageLocation = value.(string)
@@ -87,8 +101,12 @@ func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client r
storageName, _, _ := remote_storage.RemoteStorageLocation(storageLocation).NameBucketPath()
- remoteConf, ok := rs.storageNameToConf[storageName]
- if !ok {
+ return rs.GetRemoteStorageClient(storageName)
+}
+
+func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
+ remoteConf, found = rs.storageNameToConf[storageName]
+ if !found {
return
}
diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/filer_remote_storage_test.go
index 1a41c6e63..e5996475e 100644
--- a/weed/filer/filer_remote_storage_test.go
+++ b/weed/filer/filer_remote_storage_test.go
@@ -16,15 +16,15 @@ func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) {
rs.mapDirectoryToRemoteStorage("/a/b/c", "s7")
- _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f")
+ _, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f")
assert.Equal(t, true, found, "find storage client")
- _, found2 := rs.FindRemoteStorageClient("/a/b")
+ _, _, found2 := rs.FindRemoteStorageClient("/a/b")
assert.Equal(t, false, found2, "should not find storage client")
- _, found3 := rs.FindRemoteStorageClient("/a/b/c")
+ _, _, found3 := rs.FindRemoteStorageClient("/a/b/c")
assert.Equal(t, false, found3, "should not find storage client")
- _, found4 := rs.FindRemoteStorageClient("/a/b/cc")
+ _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc")
assert.Equal(t, false, found4, "should not find storage client")
} \ No newline at end of file
diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go
new file mode 100644
index 000000000..57450d6d8
--- /dev/null
+++ b/weed/filer/read_remote.go
@@ -0,0 +1,27 @@
+package filer
+
+import (
+ "fmt"
+ "io"
+)
+
+func (entry *Entry) IsRemoteOnly() bool {
+ return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0
+}
+
+func (f *Filer) ReadRemote(w io.Writer, entry *Entry, offset int64, size int64) error {
+ client, _, found := f.RemoteStorage.GetRemoteStorageClient(remoteEntry.Remote.StorageName)
+ if !found {
+ return fmt.Errorf("remote storage %v not found", entry.Remote.StorageName)
+ }
+
+ mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath)
+ _, bucket, path := remoteLoation.NameBucketPath()
+
+ remoteFullPath := path + string(entry.FullPath[len(mountDir):])
+
+ client.ReadFile(bucket, remoteFullPath[1:], offset, size, func(w io.Writer) error {
+
+ })
+ return nil
+}
diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go
index 8794e7268..b8c2b55ea 100644
--- a/weed/remote_storage/remote_storage.go
+++ b/weed/remote_storage/remote_storage.go
@@ -3,6 +3,7 @@ package remote_storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
"strings"
"sync"
)
@@ -14,10 +15,10 @@ func (remote RemoteStorageLocation) NameBucketPath() (storageName, bucket, remot
remote = remote[:len(remote)-1]
}
parts := strings.SplitN(string(remote), "/", 3)
- if len(parts)>=1 {
+ if len(parts) >= 1 {
storageName = parts[0]
}
- if len(parts)>=2 {
+ if len(parts) >= 2 {
bucket = parts[1]
}
remotePath = string(remote[len(storageName)+1+len(bucket):])
@@ -31,6 +32,7 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
type RemoteStorageClient interface {
Traverse(remote RemoteStorageLocation, visitFn VisitFunc) error
+ ReadFile(bucket, key string, offset int64, size int64, writeFn func(w io.Writer) error) error
}
type RemoteStorageClientMaker interface {
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index d7afaa65a..534bc4840 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -149,6 +149,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.LoadFilerConf()
+ fs.filer.LoadRemoteStorageConfAndMapping()
+
grace.OnInterrupt(func() {
fs.filer.Shutdown()
})
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 957e08855..add0be1f4 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -101,7 +101,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
//Seaweed custom header are not visible to Vue or javascript
seaweedHeaders := []string{}
- for header, _ := range w.Header() {
+ for header := range w.Header() {
if strings.HasPrefix(header, "Seaweed-") {
seaweedHeaders = append(seaweedHeaders, header)
}
@@ -163,9 +163,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
return err
}
- err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
- if err != nil {
- glog.Errorf("failed to stream content %s: %v", r.URL, err)
+ if entry.IsRemoteOnly() {
+ err = fs.filer.ReadRemote(writer, entry, offset, size)
+ if err != nil {
+ glog.Errorf("failed to read remote %s: %v", r.URL, err)
+ }
+ } else {
+ err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+ if err != nil {
+ glog.Errorf("failed to stream content %s: %v", r.URL, err)
+ }
}
return err
})