aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-24 19:01:58 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-24 19:01:58 -0800
commit00707ec00fb8016ac9ef8858a01a9784a6aee1a0 (patch)
treedfe313d9fead4bcdf741e52f7b6316d2da00d71b /weed/filesys
parent2c5eac5705c12b4dc0930d0a27478a73924b9e16 (diff)
downloadseaweedfs-00707ec00fb8016ac9ef8858a01a9784a6aee1a0.tar.xz
seaweedfs-00707ec00fb8016ac9ef8858a01a9784a6aee1a0.zip
mount: outsideContainerClusterMode proxy through filer
Running mount outside of the cluster would not need to expose all the volume servers to outside of the cluster. The chunk read and write will go through the filer.
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go5
-rw-r--r--weed/filesys/file.go5
-rw-r--r--weed/filesys/filehandle.go6
-rw-r--r--weed/filesys/wfs.go13
-rw-r--r--weed/filesys/wfs_deletion.go84
-rw-r--r--weed/filesys/wfs_filer_client.go7
-rw-r--r--weed/filesys/wfs_write.go5
7 files changed, 22 insertions, 103 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index d86d92ac9..3d0a00a8b 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -404,11 +404,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode()
delete(dir.wfs.handles, inodeId)
- // delete the chunks last
- if isDeleteData {
- dir.wfs.deleteFileChunks(entry.Chunks)
- }
-
return nil
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index a2b6660d8..a8d6dac29 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -147,9 +147,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
file.entry.Chunks = chunks
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
file.reader = nil
- file.wfs.deleteFileChunks(truncatedChunks)
}
file.entry.Attributes.FileSize = req.Size
file.dirtyMetadata = true
@@ -329,7 +328,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.reader = nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 6225ab968..da42ae562 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
@@ -128,7 +128,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
if fh.f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
- fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
+ fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
}
totalRead, err := fh.f.reader.ReadAt(buff, offset)
@@ -269,7 +269,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
- chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
+ chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil {
// not good, but should be ok
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index cd14e8032..236ecdacb 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -3,6 +3,8 @@ package filesys
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"os"
"path"
@@ -24,6 +26,7 @@ import (
)
type Option struct {
+ FilerAddress string
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
@@ -237,3 +240,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}
+
+func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
+ if wfs.option.OutsideContainerClusterMode {
+ return func(fileId string) (targetUrls []string, err error) {
+ return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
+ }
+ }
+ return filer.LookupFn(wfs)
+
+}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
deleted file mode 100644
index a245b6795..000000000
--- a/weed/filesys/wfs_deletion.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package filesys
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
- if len(chunks) == 0 {
- return
- }
-
- var fileIds []string
- for _, chunk := range chunks {
- if !chunk.IsChunkManifest {
- fileIds = append(fileIds, chunk.GetFileIdString())
- continue
- }
- dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk)
- if manifestResolveErr != nil {
- glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
- }
- for _, dChunk := range dataChunks {
- fileIds = append(fileIds, dChunk.GetFileIdString())
- }
- fileIds = append(fileIds, chunk.GetFileIdString())
- }
-
- wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds)
- return nil
- })
-}
-
-func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
-
- var vids []string
- for _, fileId := range fileIds {
- vids = append(vids, filer.VolumeId(fileId))
- }
-
- lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
-
- m := make(map[string]operation.LookupResult)
-
- glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
- VolumeIds: vids,
- })
- if err != nil {
- return m, err
- }
-
- for _, vid := range vids {
- lr := operation.LookupResult{
- VolumeId: vid,
- Locations: nil,
- }
- locations, found := resp.LocationsMap[vid]
- if !found {
- continue
- }
- for _, loc := range locations.Locations {
- lr.Locations = append(lr.Locations, operation.Location{
- Url: wfs.AdjustedUrl(loc),
- PublicUrl: loc.PublicUrl,
- })
- }
- m[vid] = lr
- }
-
- return m, err
- }
-
- _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
-
- return err
-}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index ef4213af1..e0d352a7b 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -25,10 +25,3 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
return err
}
-
-func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
- if wfs.option.OutsideContainerClusterMode {
- return location.PublicUrl
- }
- return location.Url
-}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 83e40e7f5..dfe6e57a6 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
Url: resp.Url,
PublicUrl: resp.PublicUrl,
}
- host = wfs.AdjustedUrl(loc)
+ host = loc.Url
collection, replication = resp.Collection, resp.Replication
return nil
@@ -53,6 +53,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if wfs.option.OutsideContainerClusterMode {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
+ }
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)