aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/source/filer_source.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/replication/source/filer_source.go
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/replication/source/filer_source.go')
-rw-r--r--weed/replication/source/filer_source.go76
1 files changed, 55 insertions, 21 deletions
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index d7b5ebc4d..e2e3575dc 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -3,13 +3,15 @@ package source
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
"io"
"net/http"
"strings"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -23,32 +25,41 @@ type FilerSource struct {
grpcAddress string
grpcDialOption grpc.DialOption
Dir string
+ address string
+ proxyByFiler bool
}
-func (fs *FilerSource) Initialize(configuration util.Configuration) error {
- return fs.initialize(
- configuration.GetString("grpcAddress"),
- configuration.GetString("directory"),
+func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
+ return fs.DoInitialize(
+ "",
+ configuration.GetString(prefix+"grpcAddress"),
+ configuration.GetString(prefix+"directory"),
+ false,
)
}
-func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
+func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) {
+ fs.address = address
+ if fs.address == "" {
+ fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
+ }
fs.grpcAddress = grpcAddress
fs.Dir = dir
- fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+ fs.proxyByFiler = readChunkFromFiler
return nil
}
-func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl string, err error) {
+func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) {
vid2Locations := make(map[string]*filer_pb.Locations)
vid := volumeId(part)
- err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read lookup volume id locations: %v", vid)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
@@ -62,42 +73,65 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl s
if err != nil {
glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
- return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
+ return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
}
locations := vid2Locations[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
- return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
+ return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
- fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
+ if !fs.proxyByFiler {
+ for _, loc := range locations.Locations {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part))
+ }
+ } else {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part))
+ }
return
}
-func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) {
+func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
- fileUrl, err := fs.LookupFileId(ctx, part)
+ if fs.proxyByFiler {
+ return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId)
+ }
+
+ fileUrls, err := fs.LookupFileId(fileId)
if err != nil {
return "", nil, nil, err
}
- filename, header, readCloser, err = util.DownloadFile(fileUrl)
+ for _, fileUrl := range fileUrls {
+ filename, header, resp, err = util.DownloadFile(fileUrl)
+ if err != nil {
+ glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
+ } else {
+ break
+ }
+ }
- return filename, header, readCloser, err
+ return filename, header, resp, err
}
-func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
+var _ = filer_pb.FilerClient(&FilerSource{})
+
+func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, fs.grpcDialOption)
}
+func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
+
func volumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 {