aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/source/filer_source.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2019-07-16 11:13:23 +0800
committerGitHub <noreply@github.com>2019-07-16 11:13:23 +0800
commitd19bbee98d89ec6cd603572bd9c5d55749610e61 (patch)
tree8d760dcee4dfcb4404af90b7d5e64def4549b4cc /weed/replication/source/filer_source.go
parent01060c992591f412b0d5e180bde29991747a9462 (diff)
parent5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff)
downloadseaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz
seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip
keep update from original (#1)
keep update from original
Diffstat (limited to 'weed/replication/source/filer_source.go')
-rw-r--r--weed/replication/source/filer_source.go33
1 files changed, 17 insertions, 16 deletions
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index efe71e706..d7b5ebc4d 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -3,6 +3,9 @@ package source
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
"io"
"net/http"
"strings"
@@ -17,8 +20,9 @@ type ReplicationSource interface {
}
type FilerSource struct {
- grpcAddress string
- Dir string
+ grpcAddress string
+ grpcDialOption grpc.DialOption
+ Dir string
}
func (fs *FilerSource) Initialize(configuration util.Configuration) error {
@@ -31,19 +35,20 @@ func (fs *FilerSource) Initialize(configuration util.Configuration) error {
func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
fs.grpcAddress = grpcAddress
fs.Dir = dir
+ fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
return nil
}
-func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
+func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl string, err error) {
vid2Locations := make(map[string]*filer_pb.Locations)
vid := volumeId(part)
- err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read lookup volume id locations: %v", vid)
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
@@ -72,9 +77,9 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
return
}
-func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) {
+func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) {
- fileUrl, err := fs.LookupFileId(part)
+ fileUrl, err := fs.LookupFileId(ctx, part)
if err != nil {
return "", nil, nil, err
}
@@ -84,17 +89,13 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade
return filename, header, readCloser, err
}
-func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(fs.grpcAddress)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
- }
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, fs.grpcAddress, fs.grpcDialOption)
- return fn(client)
}
func volumeId(fileId string) string {