diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
|---|---|---|
| committer | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
| commit | d861cbd81b75b6684c971ac00e33685e6575b833 (patch) | |
| tree | 301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/replication/source/filer_source.go | |
| parent | 70da715d8d917527291b35fb069fac077d17b868 (diff) | |
| parent | 4ee58922eff61a5a4ca29c0b4829b097a498549e (diff) | |
| download | seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip | |
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/replication/source/filer_source.go')
| -rw-r--r-- | weed/replication/source/filer_source.go | 76 |
1 files changed, 55 insertions, 21 deletions
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index d7b5ebc4d..e2e3575dc 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -3,13 +3,15 @@ package source import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "google.golang.org/grpc" "io" "net/http" "strings" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -23,32 +25,41 @@ type FilerSource struct { grpcAddress string grpcDialOption grpc.DialOption Dir string + address string + proxyByFiler bool } -func (fs *FilerSource) Initialize(configuration util.Configuration) error { - return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), +func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { + return fs.DoInitialize( + "", + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), + false, ) } -func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { +func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) { + fs.address = address + if fs.address == "" { + fs.address = pb.GrpcAddressToServerAddress(grpcAddress) + } fs.grpcAddress = grpcAddress fs.Dir = dir - fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + fs.proxyByFiler = readChunkFromFiler return nil } -func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl string, err error) { +func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) vid := volumeId(part) - err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read lookup volume id locations: %v", vid) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) if err != nil { @@ -62,42 +73,65 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl s if err != nil { glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err) - return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err) + return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err) } locations := vid2Locations[vid] if locations == nil || len(locations.Locations) == 0 { glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err) - return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) + return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) } - fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) + if !fs.proxyByFiler { + for _, loc := range locations.Locations { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part)) + } + } else { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part)) + } return } -func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { +func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { - fileUrl, err := fs.LookupFileId(ctx, part) + if fs.proxyByFiler { + return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId) + } + + fileUrls, err := fs.LookupFileId(fileId) if err != nil { return "", nil, nil, err } - filename, header, readCloser, err = util.DownloadFile(fileUrl) + for _, fileUrl := range fileUrls { + filename, header, resp, err = util.DownloadFile(fileUrl) + if err != nil { + glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) + } else { + break + } + } - return filename, header, readCloser, err + return filename, header, resp, err } -func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { +var _ = filer_pb.FilerClient(&FilerSource{}) + +func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) } +func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + func volumeId(fileId string) string { lastCommaIndex := strings.LastIndex(fileId, ",") if lastCommaIndex > 0 { |
