aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/entry.go6
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/filer_on_meta_event.go14
-rw-r--r--weed/filer/filer_remote_storage.go182
-rw-r--r--weed/filer/filer_remote_storage_test.go34
-rw-r--r--weed/filer/filer_search.go5
-rw-r--r--weed/filer/read_remote.go29
-rw-r--r--weed/filer/stream.go32
8 files changed, 289 insertions, 15 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index ede58a384..7673365fb 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -42,7 +42,7 @@ type Entry struct {
HardLinkId HardLinkId
HardLinkCounter int32
Content []byte
- Remote *filer_pb.Entry_Remote
+ Remote *filer_pb.RemoteEntry
}
func (entry *Entry) Size() uint64 {
@@ -78,7 +78,7 @@ func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) {
message.HardLinkId = entry.HardLinkId
message.HardLinkCounter = entry.HardLinkCounter
message.Content = entry.Content
- message.Remote = entry.Remote
+ message.RemoteEntry = entry.Remote
}
func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) {
@@ -88,7 +88,7 @@ func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) {
fsEntry.HardLinkId = HardLinkId(message.HardLinkId)
fsEntry.HardLinkCounter = message.HardLinkCounter
fsEntry.Content = message.Content
- fsEntry.Remote = message.Remote
+ fsEntry.Remote = message.RemoteEntry
}
func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 862f98496..1a20abefc 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -42,6 +42,7 @@ type Filer struct {
MetaAggregator *MetaAggregator
Signature int32
FilerConf *FilerConf
+ RemoteStorage *FilerRemoteStorage
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
@@ -51,6 +52,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
+ RemoteStorage: NewFilerRemoteStorage(),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index 5717b2b09..34ac5321a 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -12,6 +12,7 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
f.maybeReloadFilerConfiguration(event)
+ f.maybeReloadRemoteStorageConfigurationAndMapping(event)
f.onBucketEvents(event)
}
@@ -84,3 +85,16 @@ func (f *Filer) LoadFilerConf() {
}
f.FilerConf = fc
}
+
+////////////////////////////////////
+// load and maintain remote storages
+////////////////////////////////////
+func (f *Filer) LoadRemoteStorageConfAndMapping() {
+ if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil {
+ glog.Errorf("read remote conf and mapping: %v", err)
+ return
+ }
+}
+func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) {
+ // FIXME add reloading
+}
diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go
new file mode 100644
index 000000000..573dcf3e7
--- /dev/null
+++ b/weed/filer/filer_remote_storage.go
@@ -0,0 +1,182 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+ "math"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/viant/ptrie"
+)
+
+const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
+const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
+
+type FilerRemoteStorage struct {
+ rules ptrie.Trie
+ storageNameToConf map[string]*filer_pb.RemoteConf
+}
+
+func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
+ rs = &FilerRemoteStorage{
+ rules: ptrie.New(),
+ storageNameToConf: make(map[string]*filer_pb.RemoteConf),
+ }
+ return rs
+}
+
+func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
+ // execute this on filer
+
+ entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "")
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ return nil
+ }
+ glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err)
+ return
+ }
+
+ for _, entry := range entries {
+ if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
+ if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
+ return err
+ }
+ continue
+ }
+ if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
+ return nil
+ }
+ conf := &filer_pb.RemoteConf{}
+ if err := proto.Unmarshal(entry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
+ }
+ rs.storageNameToConf[conf.Name] = conf
+ }
+ return nil
+}
+
+func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
+ mappings := &filer_pb.RemoteStorageMapping{}
+ if err := proto.Unmarshal(data, mappings); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
+ }
+ for dir, storageLocation := range mappings.Mappings {
+ rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation)
+ }
+ return nil
+}
+
+func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) {
+ rs.rules.Put([]byte(dir+"/"), loc)
+}
+
+func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) {
+ rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
+ mountDir = util.FullPath(string(key[:len(key)-1]))
+ remoteLocation = value.(*filer_pb.RemoteStorageLocation)
+ return true
+ })
+ return
+}
+
+func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
+ var storageLocation *filer_pb.RemoteStorageLocation
+ rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
+ storageLocation = value.(*filer_pb.RemoteStorageLocation)
+ return true
+ })
+
+ if storageLocation == nil {
+ found = false
+ return
+ }
+
+ return rs.GetRemoteStorageClient(storageLocation.Name)
+}
+
+func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
+ remoteConf, found = rs.storageNameToConf[storageName]
+ if !found {
+ return
+ }
+
+ var err error
+ if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil {
+ found = true
+ return
+ }
+ return
+}
+
+func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) {
+ mappings = &filer_pb.RemoteStorageMapping{
+ Mappings: make(map[string]*filer_pb.RemoteStorageLocation),
+ }
+ if len(oldContent) > 0 {
+ if err = proto.Unmarshal(oldContent, mappings); err != nil {
+ glog.Warningf("unmarshal existing mappings: %v", err)
+ }
+ }
+ return
+}
+
+func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) {
+ mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
+ if unmarshalErr != nil {
+ // skip
+ }
+
+ // set the new mapping
+ mappings.Mappings[dir] = storageLocation
+
+ if newContent, err = proto.Marshal(mappings); err != nil {
+ return oldContent, fmt.Errorf("marshal mappings: %v", err)
+ }
+
+ return
+}
+
+
+func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
+ if readErr != nil {
+ return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+
+ return
+}
+
+func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ // unmarshal storage configuration
+ conf = &filer_pb.RemoteConf{}
+ if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
+ readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
+ return
+ }
+
+ return
+}
diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/filer_remote_storage_test.go
new file mode 100644
index 000000000..427cd5a8a
--- /dev/null
+++ b/weed/filer/filer_remote_storage_test.go
@@ -0,0 +1,34 @@
+package filer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) {
+ conf := &filer_pb.RemoteConf{
+ Name: "s7",
+ Type: "s3",
+ }
+ rs := NewFilerRemoteStorage()
+ rs.storageNameToConf[conf.Name] = conf
+
+ rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{
+ Name: "s7",
+ Bucket: "some",
+ Path: "/dir",
+ })
+
+ _, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f")
+ assert.Equal(t, true, found, "find storage client")
+
+ _, _, found2 := rs.FindRemoteStorageClient("/a/b")
+ assert.Equal(t, false, found2, "should not find storage client")
+
+ _, _, found3 := rs.FindRemoteStorageClient("/a/b/c")
+ assert.Equal(t, false, found3, "should not find storage client")
+
+ _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc")
+ assert.Equal(t, false, found4, "should not find storage client")
+} \ No newline at end of file
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 2ee29be25..2e0336da8 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"github.com/chrislusf/seaweedfs/weed/util"
+ "math"
"path/filepath"
"strings"
)
@@ -27,6 +28,10 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
return true
})
+ if limit == math.MaxInt64 {
+ limit = math.MaxInt64 - 1
+ }
+
hasMore = int64(len(entries)) >= limit+1
if hasMore {
entries = entries[:limit]
diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go
new file mode 100644
index 000000000..d9423ceda
--- /dev/null
+++ b/weed/filer/read_remote.go
@@ -0,0 +1,29 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (entry *Entry) IsInRemoteOnly() bool {
+ return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0
+}
+
+func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data[]byte, err error) {
+ client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName)
+ if !found {
+ return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName)
+ }
+
+ mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath)
+
+ remoteFullPath := remoteLoation.Path + string(entry.FullPath[len(mountDir):])
+
+ sourceLoc := &filer_pb.RemoteStorageLocation{
+ Name: remoteLoation.Name,
+ Bucket: remoteLoation.Bucket,
+ Path: remoteFullPath,
+ }
+
+ return client.ReadFile(sourceLoc, offset, size)
+}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 3859f9a67..503e6b23f 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -91,6 +91,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
type ChunkStreamReader struct {
chunkViews []*ChunkView
totalSize int64
+ logicOffset int64
buffer []byte
bufferOffset int64
bufferPos int
@@ -137,8 +138,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F
}
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
- _, err = c.Seek(off, io.SeekStart)
- if err != nil {
+ if err = c.prepareBufferFor(c.logicOffset); err != nil {
return
}
return c.Read(p)
@@ -151,12 +151,15 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
return n, io.EOF
}
chunkView := c.chunkViews[c.nextChunkViewIndex]
- c.fetchChunkToBuffer(chunkView)
+ if err = c.fetchChunkToBuffer(chunkView); err != nil {
+ return
+ }
c.nextChunkViewIndex++
}
t := copy(p[n:], c.buffer[c.bufferPos:])
c.bufferPos += t
n += t
+ c.logicOffset += int64(t)
}
return
}
@@ -171,19 +174,26 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
- offset += c.bufferOffset + int64(c.bufferPos)
+ offset += c.logicOffset
case io.SeekEnd:
offset = c.totalSize + offset
}
if offset > c.totalSize {
err = io.ErrUnexpectedEOF
+ } else {
+ c.logicOffset = offset
}
+ return offset, err
+
+}
+
+func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
// stay in the same chunk
if !c.isBufferEmpty() {
if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
c.bufferPos = int(offset - c.bufferOffset)
- return offset, nil
+ return nil
}
}
@@ -192,23 +202,21 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
return c.chunkViews[i].LogicOffset <= offset
})
if currentChunkIndex == len(c.chunkViews) {
- return 0, io.EOF
+ return io.EOF
}
// positioning within the new chunk
chunk := c.chunkViews[currentChunkIndex]
if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
- c.fetchChunkToBuffer(chunk)
+ if err = c.fetchChunkToBuffer(chunk); err != nil {
+ return
+ }
c.nextChunkViewIndex = currentChunkIndex + 1
}
c.bufferPos = int(offset - c.bufferOffset)
- } else {
- return 0, io.ErrUnexpectedEOF
}
-
- return offset, err
-
+ return
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {