aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2021-12-26 00:15:03 -0800
committerchrislu <chris.lu@gmail.com>2021-12-26 00:15:03 -0800
commit9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch)
tree1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/messaging
parentc935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff)
downloadseaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz
seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call. this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_append.go8
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go6
-rw-r--r--weed/messaging/broker/broker_server.go10
3 files changed, 12 insertions, 12 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 9958a0752..9a31a8ac0 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
dir, name := util.FullPath(targetFile).DirAndName()
// append the chunk
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AppendToEntryRequest{
Directory: dir,
@@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var assignResult = &operation.AssignResult{}
// assign a volume location
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
assignErr := util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
@@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var _ = filer_pb.FilerClient(&MessageBroker{})
-func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
for _, filer := range broker.option.Filers {
- if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
if err == io.EOF {
return
}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
index 66821d404..5cd8edd33 100644
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -34,7 +34,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
Resource: targetTopicPartition,
})
@@ -68,7 +68,7 @@ func (broker *MessageBroker) checkFilers() {
found := false
for !found {
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
@@ -93,7 +93,7 @@ func (broker *MessageBroker) checkFilers() {
found = false
for !found {
for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 193c1c689..acf2d6d34 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -49,7 +49,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
for _, filer := range broker.option.Filers {
- broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.KeepConnected(ctx)
@@ -101,15 +101,15 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
}
-func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+ return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
}
-func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
+func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
- return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})