diff options
Diffstat (limited to 'weed/pb/grpc_client_server.go')
| -rw-r--r-- | weed/pb/grpc_client_server.go | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index f3cca7fba..5f685912e 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -3,6 +3,7 @@ package pb import ( "context" "fmt" + "google.golang.org/grpc/metadata" "math/rand" "net/http" "strconv" @@ -118,7 +119,7 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO } // WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. -func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error { +func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error { if !streamingMode { vgc, err := getOrCreateConnection(address, waitForReady, opts...) @@ -141,7 +142,12 @@ func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address } return executionErr } else { - grpcConnection, err := GrpcDial(context.Background(), address, waitForReady, opts...) + ctx := context.Background() + if signature != 0 { + md := metadata.New(map[string]string{"sw-client-id": fmt.Sprintf("%d", signature)}) + ctx = metadata.NewOutgoingContext(ctx, md) + } + grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...) if err != nil { return fmt.Errorf("fail to dial %s: %v", address, err) } @@ -204,7 +210,7 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { } func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, waitForReady bool, fn func(client master_pb.SeaweedClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, master.ToGrpcAddress(), waitForReady, grpcDialOption) @@ -212,7 +218,7 @@ func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption g } func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpcDialOption grpc.DialOption, fn func(client volume_server_pb.VolumeServerClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, volumeServer.ToGrpcAddress(), false, grpcDialOption) @@ -220,7 +226,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc } func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, broker.ToGrpcAddress(), false, grpcDialOption) @@ -230,7 +236,7 @@ func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption g func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { for _, masterGrpcAddress := range masterGrpcAddresses { - err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress.ToGrpcAddress(), false, grpcDialOption) @@ -244,22 +250,22 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, brokerGrpcAddress, false, grpcDialOption) } -func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithFilerClient(streamingMode bool, signature int32, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn) + return WithGrpcFilerClient(streamingMode, signature, filer, grpcDialOption, fn) } -func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithGrpcFilerClient(streamingMode bool, signature int32, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress.ToGrpcAddress(), false, grpcDialOption) @@ -269,7 +275,7 @@ func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grp func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { for _, filerAddress := range filerAddresses { - err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerAddress.ToGrpcAddress(), false, grpcDialOption) |
