diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/entry.go | 6 | ||||
| -rw-r--r-- | weed/filer/filer.go | 2 | ||||
| -rw-r--r-- | weed/filer/filer_on_meta_event.go | 14 | ||||
| -rw-r--r-- | weed/filer/filer_remote_storage.go | 182 | ||||
| -rw-r--r-- | weed/filer/filer_remote_storage_test.go | 34 | ||||
| -rw-r--r-- | weed/filer/filer_search.go | 5 | ||||
| -rw-r--r-- | weed/filer/read_remote.go | 29 | ||||
| -rw-r--r-- | weed/filer/stream.go | 32 |
8 files changed, 289 insertions, 15 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go index ede58a384..7673365fb 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -42,7 +42,7 @@ type Entry struct { HardLinkId HardLinkId HardLinkCounter int32 Content []byte - Remote *filer_pb.Entry_Remote + Remote *filer_pb.RemoteEntry } func (entry *Entry) Size() uint64 { @@ -78,7 +78,7 @@ func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) { message.HardLinkId = entry.HardLinkId message.HardLinkCounter = entry.HardLinkCounter message.Content = entry.Content - message.Remote = entry.Remote + message.RemoteEntry = entry.Remote } func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) { @@ -88,7 +88,7 @@ func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) { fsEntry.HardLinkId = HardLinkId(message.HardLinkId) fsEntry.HardLinkCounter = message.HardLinkCounter fsEntry.Content = message.Content - fsEntry.Remote = message.Remote + fsEntry.Remote = message.RemoteEntry } func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 862f98496..1a20abefc 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -42,6 +42,7 @@ type Filer struct { MetaAggregator *MetaAggregator Signature int32 FilerConf *FilerConf + RemoteStorage *FilerRemoteStorage } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -51,6 +52,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), + RemoteStorage: NewFilerRemoteStorage(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index 5717b2b09..34ac5321a 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -12,6 +12,7 @@ import ( // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { f.maybeReloadFilerConfiguration(event) + f.maybeReloadRemoteStorageConfigurationAndMapping(event) f.onBucketEvents(event) } @@ -84,3 +85,16 @@ func (f *Filer) LoadFilerConf() { } f.FilerConf = fc } + +//////////////////////////////////// +// load and maintain remote storages +//////////////////////////////////// +func (f *Filer) LoadRemoteStorageConfAndMapping() { + if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil { + glog.Errorf("read remote conf and mapping: %v", err) + return + } +} +func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) { + // FIXME add reloading +} diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go new file mode 100644 index 000000000..573dcf3e7 --- /dev/null +++ b/weed/filer/filer_remote_storage.go @@ -0,0 +1,182 @@ +package filer + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "math" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/viant/ptrie" +) + +const REMOTE_STORAGE_CONF_SUFFIX = ".conf" +const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping" + +type FilerRemoteStorage struct { + rules ptrie.Trie + storageNameToConf map[string]*filer_pb.RemoteConf +} + +func NewFilerRemoteStorage() (rs *FilerRemoteStorage) { + rs = &FilerRemoteStorage{ + rules: ptrie.New(), + storageNameToConf: make(map[string]*filer_pb.RemoteConf), + } + return rs +} + +func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) { + // execute this on filer + + entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "") + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err) + return + } + + for _, entry := range entries { + if entry.Name() == REMOTE_STORAGE_MOUNT_FILE { + if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil { + return err + } + continue + } + if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) { + return nil + } + conf := &filer_pb.RemoteConf{} + if err := proto.Unmarshal(entry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err) + } + rs.storageNameToConf[conf.Name] = conf + } + return nil +} + +func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) { + mappings := &filer_pb.RemoteStorageMapping{} + if err := proto.Unmarshal(data, mappings); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err) + } + for dir, storageLocation := range mappings.Mappings { + rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation) + } + return nil +} + +func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) { + rs.rules.Put([]byte(dir+"/"), loc) +} + +func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) { + rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { + mountDir = util.FullPath(string(key[:len(key)-1])) + remoteLocation = value.(*filer_pb.RemoteStorageLocation) + return true + }) + return +} + +func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { + var storageLocation *filer_pb.RemoteStorageLocation + rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { + storageLocation = value.(*filer_pb.RemoteStorageLocation) + return true + }) + + if storageLocation == nil { + found = false + return + } + + return rs.GetRemoteStorageClient(storageLocation.Name) +} + +func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { + remoteConf, found = rs.storageNameToConf[storageName] + if !found { + return + } + + var err error + if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil { + found = true + return + } + return +} + +func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) { + mappings = &filer_pb.RemoteStorageMapping{ + Mappings: make(map[string]*filer_pb.RemoteStorageLocation), + } + if len(oldContent) > 0 { + if err = proto.Unmarshal(oldContent, mappings); err != nil { + glog.Warningf("unmarshal existing mappings: %v", err) + } + } + return +} + +func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + // skip + } + + // set the new mapping + mappings.Mappings[dir] = storageLocation + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} + + +func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return readErr + }); readErr != nil { + return nil, readErr + } + + mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) + if readErr != nil { + return nil, fmt.Errorf("unmarshal mappings: %v", readErr) + } + + return +} + +func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) + return readErr + }); readErr != nil { + return nil, readErr + } + + // unmarshal storage configuration + conf = &filer_pb.RemoteConf{} + if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil { + readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) + return + } + + return +} diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/filer_remote_storage_test.go new file mode 100644 index 000000000..427cd5a8a --- /dev/null +++ b/weed/filer/filer_remote_storage_test.go @@ -0,0 +1,34 @@ +package filer + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { + conf := &filer_pb.RemoteConf{ + Name: "s7", + Type: "s3", + } + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + + rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{ + Name: "s7", + Bucket: "some", + Path: "/dir", + }) + + _, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f") + assert.Equal(t, true, found, "find storage client") + + _, _, found2 := rs.FindRemoteStorageClient("/a/b") + assert.Equal(t, false, found2, "should not find storage client") + + _, _, found3 := rs.FindRemoteStorageClient("/a/b/c") + assert.Equal(t, false, found3, "should not find storage client") + + _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc") + assert.Equal(t, false, found4, "should not find storage client") +}
\ No newline at end of file diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 2ee29be25..2e0336da8 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -3,6 +3,7 @@ package filer import ( "context" "github.com/chrislusf/seaweedfs/weed/util" + "math" "path/filepath" "strings" ) @@ -27,6 +28,10 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start return true }) + if limit == math.MaxInt64 { + limit = math.MaxInt64 - 1 + } + hasMore = int64(len(entries)) >= limit+1 if hasMore { entries = entries[:limit] diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go new file mode 100644 index 000000000..d9423ceda --- /dev/null +++ b/weed/filer/read_remote.go @@ -0,0 +1,29 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (entry *Entry) IsInRemoteOnly() bool { + return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0 +} + +func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data[]byte, err error) { + client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName) + if !found { + return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName) + } + + mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath) + + remoteFullPath := remoteLoation.Path + string(entry.FullPath[len(mountDir):]) + + sourceLoc := &filer_pb.RemoteStorageLocation{ + Name: remoteLoation.Name, + Bucket: remoteLoation.Bucket, + Path: remoteFullPath, + } + + return client.ReadFile(sourceLoc, offset, size) +} diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 3859f9a67..503e6b23f 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -91,6 +91,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) type ChunkStreamReader struct { chunkViews []*ChunkView totalSize int64 + logicOffset int64 buffer []byte bufferOffset int64 bufferPos int @@ -137,8 +138,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F } func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { - _, err = c.Seek(off, io.SeekStart) - if err != nil { + if err = c.prepareBufferFor(c.logicOffset); err != nil { return } return c.Read(p) @@ -151,12 +151,15 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { return n, io.EOF } chunkView := c.chunkViews[c.nextChunkViewIndex] - c.fetchChunkToBuffer(chunkView) + if err = c.fetchChunkToBuffer(chunkView); err != nil { + return + } c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t n += t + c.logicOffset += int64(t) } return } @@ -171,19 +174,26 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { switch whence { case io.SeekStart: case io.SeekCurrent: - offset += c.bufferOffset + int64(c.bufferPos) + offset += c.logicOffset case io.SeekEnd: offset = c.totalSize + offset } if offset > c.totalSize { err = io.ErrUnexpectedEOF + } else { + c.logicOffset = offset } + return offset, err + +} + +func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { // stay in the same chunk if !c.isBufferEmpty() { if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { c.bufferPos = int(offset - c.bufferOffset) - return offset, nil + return nil } } @@ -192,23 +202,21 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { return c.chunkViews[i].LogicOffset <= offset }) if currentChunkIndex == len(c.chunkViews) { - return 0, io.EOF + return io.EOF } // positioning within the new chunk chunk := c.chunkViews[currentChunkIndex] if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) + if err = c.fetchChunkToBuffer(chunk); err != nil { + return + } c.nextChunkViewIndex = currentChunkIndex + 1 } c.bufferPos = int(offset - c.bufferOffset) - } else { - return 0, io.ErrUnexpectedEOF } - - return offset, err - + return } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { |
