aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-24 01:41:38 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-24 01:41:38 -0800
commit096e088d7bb2a5dce7573b24c2d3006d1cb6f9ec (patch)
tree888668ade913d7e8b912fd73b781c52ae3a700a1
parentccbdb38c89a79a482c46faaa9b0dd53e3dacb822 (diff)
downloadseaweedfs-096e088d7bb2a5dce7573b24c2d3006d1cb6f9ec.tar.xz
seaweedfs-096e088d7bb2a5dce7573b24c2d3006d1cb6f9ec.zip
mount: when outside cluster network, use filer as proxy to access volume servers
-rw-r--r--weed/command/mount_std.go1
-rw-r--r--weed/filer/reader_at.go6
-rw-r--r--weed/filesys/file.go4
-rw-r--r--weed/filesys/filehandle.go6
-rw-r--r--weed/filesys/wfs.go13
-rw-r--r--weed/filesys/wfs_deletion.go4
-rw-r--r--weed/filesys/wfs_filer_client.go7
-rw-r--r--weed/filesys/wfs_write.go5
-rw-r--r--weed/messaging/broker/broker_append.go4
-rw-r--r--weed/pb/filer_pb/filer_client.go1
-rw-r--r--weed/replication/sink/filersink/fetch_write.go3
-rw-r--r--weed/replication/source/filer_source.go4
-rw-r--r--weed/s3api/s3api_handlers.go3
-rw-r--r--weed/server/webdav_server.go5
-rw-r--r--weed/shell/commands.go4
15 files changed, 29 insertions, 41 deletions
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 9e955e344..a95ecd567 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -169,6 +169,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
+ FilerAddress: filer,
FilerGrpcAddress: filerGrpcAddress,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 307224f35..41d177210 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -71,7 +71,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
for _, loc := range locations.Locations {
- volumeServerAddress := filerClient.AdjustedUrl(loc)
+ volumeServerAddress := loc.Url
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
targetUrls = append(targetUrls, targetUrl)
}
@@ -85,11 +85,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
-func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
+func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
- lookupFileId: LookupFn(filerClient),
+ lookupFileId: lookupFn,
chunkCache: chunkCache,
fileSize: fileSize,
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index a2b6660d8..8054435b1 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -147,7 +147,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
file.entry.Chunks = chunks
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
file.reader = nil
file.wfs.deleteFileChunks(truncatedChunks)
}
@@ -329,7 +329,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.reader = nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 6225ab968..da42ae562 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
@@ -128,7 +128,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
if fh.f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
- fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
+ fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
}
totalRead, err := fh.f.reader.ReadAt(buff, offset)
@@ -269,7 +269,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
- chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
+ chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil {
// not good, but should be ok
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index cd14e8032..236ecdacb 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -3,6 +3,8 @@ package filesys
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"os"
"path"
@@ -24,6 +26,7 @@ import (
)
type Option struct {
+ FilerAddress string
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
@@ -237,3 +240,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}
+
+func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
+ if wfs.option.OutsideContainerClusterMode {
+ return func(fileId string) (targetUrls []string, err error) {
+ return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
+ }
+ }
+ return filer.LookupFn(wfs)
+
+}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index a245b6795..7ed936a62 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -22,7 +22,7 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
fileIds = append(fileIds, chunk.GetFileIdString())
continue
}
- dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk)
+ dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(wfs.LookupFn(), chunk)
if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
}
@@ -68,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
}
for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{
- Url: wfs.AdjustedUrl(loc),
+ Url: loc.Url,
PublicUrl: loc.PublicUrl,
})
}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index ef4213af1..e0d352a7b 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -25,10 +25,3 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
return err
}
-
-func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
- if wfs.option.OutsideContainerClusterMode {
- return location.PublicUrl
- }
- return location.Url
-}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 83e40e7f5..dfe6e57a6 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
Url: resp.Url,
PublicUrl: resp.PublicUrl,
}
- host = wfs.AdjustedUrl(loc)
+ host = loc.Url
collection, replication = resp.Collection, resp.Replication
return nil
@@ -53,6 +53,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if wfs.option.OutsideContainerClusterMode {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
+ }
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 8e5b56fd0..67c9bcb79 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -107,7 +107,3 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
return
}
-
-func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 079fbd671..7198de95c 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -20,7 +20,6 @@ var (
type FilerClient interface {
WithFilerClient(fn func(SeaweedFilerClient) error) error
- AdjustedUrl(location *Location) string
}
func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index b062adcfe..544b84995 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -128,6 +128,3 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error)
}, fs.grpcAddress, fs.grpcDialOption)
}
-func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 3982360b0..eff1da8dc 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -124,10 +124,6 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro
}
-func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
-
func volumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 {
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 6935c75bd..57b26f3dd 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -50,9 +50,6 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
}
-func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
// If none of the http routes match respond with MethodNotAllowed
func notFoundHandler(w http.ResponseWriter, r *http.Request) {
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 2b238e534..5bd92a136 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -123,9 +123,6 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
}
-func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/")
@@ -523,7 +520,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
}
if f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
- f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
+ f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
}
readSize, err = f.reader.ReadAt(p, f.off)
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 0e285214b..6e1348ca5 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -102,10 +102,6 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error
}
-func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
-
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
if strings.HasPrefix(entryPath, "http") {
var u *url.URL