diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-01-24 19:01:58 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-01-24 19:01:58 -0800 |
| commit | 00707ec00fb8016ac9ef8858a01a9784a6aee1a0 (patch) | |
| tree | dfe313d9fead4bcdf741e52f7b6316d2da00d71b /weed/filesys | |
| parent | 2c5eac5705c12b4dc0930d0a27478a73924b9e16 (diff) | |
| download | seaweedfs-00707ec00fb8016ac9ef8858a01a9784a6aee1a0.tar.xz seaweedfs-00707ec00fb8016ac9ef8858a01a9784a6aee1a0.zip | |
mount: outsideContainerClusterMode proxy through filer
Running mount outside of the cluster would not need to expose all the volume servers to outside of the cluster. The chunk read and write will go through the filer.
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 5 | ||||
| -rw-r--r-- | weed/filesys/file.go | 5 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 6 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 13 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 84 | ||||
| -rw-r--r-- | weed/filesys/wfs_filer_client.go | 7 | ||||
| -rw-r--r-- | weed/filesys/wfs_write.go | 5 |
7 files changed, 22 insertions, 103 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index d86d92ac9..3d0a00a8b 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -404,11 +404,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode() delete(dir.wfs.handles, inodeId) - // delete the chunks last - if isDeleteData { - dir.wfs.deleteFileChunks(entry.Chunks) - } - return nil } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index a2b6660d8..a8d6dac29 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -147,9 +147,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } file.entry.Chunks = chunks - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks) file.reader = nil - file.wfs.deleteFileChunks(truncatedChunks) } file.entry.Attributes.FileSize = req.Size file.dirtyMetadata = true @@ -329,7 +328,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks) file.reader = nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 6225ab968..da42ae562 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } @@ -128,7 +128,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { if fh.f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64) - fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize) + fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } totalRead, err := fh.f.reader.ReadAt(buff, offset) @@ -269,7 +269,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) - chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks) + chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) if manifestErr != nil { // not good, but should be ok diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index cd14e8032..236ecdacb 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -3,6 +3,8 @@ package filesys import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/wdclient" "math" "os" "path" @@ -24,6 +26,7 @@ import ( ) type Option struct { + FilerAddress string FilerGrpcAddress string GrpcDialOption grpc.DialOption FilerMountRootPath string @@ -237,3 +240,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { } entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) } + +func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { + if wfs.option.OutsideContainerClusterMode { + return func(fileId string) (targetUrls []string, err error) { + return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil + } + } + return filer.LookupFn(wfs) + +} diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go deleted file mode 100644 index a245b6795..000000000 --- a/weed/filesys/wfs_deletion.go +++ /dev/null @@ -1,84 +0,0 @@ -package filesys - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { - if len(chunks) == 0 { - return - } - - var fileIds []string - for _, chunk := range chunks { - if !chunk.IsChunkManifest { - fileIds = append(fileIds, chunk.GetFileIdString()) - continue - } - dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk) - if manifestResolveErr != nil { - glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) - } - for _, dChunk := range dataChunks { - fileIds = append(fileIds, dChunk.GetFileIdString()) - } - fileIds = append(fileIds, chunk.GetFileIdString()) - } - - wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) - return nil - }) -} - -func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { - - var vids []string - for _, fileId := range fileIds { - vids = append(vids, filer.VolumeId(fileId)) - } - - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { - - m := make(map[string]operation.LookupResult) - - glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return m, err - } - - for _, vid := range vids { - lr := operation.LookupResult{ - VolumeId: vid, - Locations: nil, - } - locations, found := resp.LocationsMap[vid] - if !found { - continue - } - for _, loc := range locations.Locations { - lr.Locations = append(lr.Locations, operation.Location{ - Url: wfs.AdjustedUrl(loc), - PublicUrl: loc.PublicUrl, - }) - } - m[vid] = lr - } - - return m, err - } - - _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) - - return err -} diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index ef4213af1..e0d352a7b 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -25,10 +25,3 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro return err } - -func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { - if wfs.option.OutsideContainerClusterMode { - return location.PublicUrl - } - return location.Url -} diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 83e40e7f5..dfe6e57a6 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun Url: resp.Url, PublicUrl: resp.PublicUrl, } - host = wfs.AdjustedUrl(loc) + host = loc.Url collection, replication = resp.Collection, resp.Replication return nil @@ -53,6 +53,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if wfs.option.OutsideContainerClusterMode { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId) + } uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) |
