aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_pub.go6
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go1
-rw-r--r--weed/mq/client/pub_client/lookup.go6
-rw-r--r--weed/mq/client/pub_client/publisher.go14
4 files changed, 22 insertions, 5 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 20a31f09c..acbffefba 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -104,10 +104,6 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
respChan := make(chan *mq_pb.PublishResponse, 128)
defer func() {
atomic.StoreInt32(&isStopping, 1)
- response := &mq_pb.PublishResponse{
- Error: "end of stream",
- }
- respChan <- response
close(respChan)
}()
go func() {
@@ -117,7 +113,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
case resp := <-respChan:
if resp != nil {
if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending setup response: %v", err)
+ glog.Errorf("Error sending response %v: %v", resp, err)
}
} else {
return
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index f5a454640..03674db3f 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -51,6 +51,7 @@ func main() {
// Wait for all publishers to finish
wg.Wait()
elapsed := time.Since(startTime)
+ publisher.Shutdown()
log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go
index a0b4298d1..28cb29015 100644
--- a/weed/mq/client/pub_client/lookup.go
+++ b/weed/mq/client/pub_client/lookup.go
@@ -5,6 +5,8 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
func (p *TopicPublisher) doLookup(brokerAddress string) error {
@@ -100,6 +102,10 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str
for {
_, err := publishClient.Recv()
if err != nil {
+ e, ok := status.FromError(err)
+ if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
+ return
+ }
publishClient.Err = err
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
return
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index f264375fa..7073457f3 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -2,10 +2,12 @@ package pub_client
import (
"github.com/rdleal/intervalst/interval"
+ "github.com/seaweedfs/seaweedfs/weed/mq/broker"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
+ "time"
)
type PublisherConfiguration struct {
@@ -41,3 +43,15 @@ func (p *TopicPublisher) Connect(bootstrapBroker string) error {
}
return nil
}
+
+func (p *TopicPublisher) Shutdown() error {
+
+ if clients, found := p.partition2Broker.AllIntersections(0, broker.MaxPartitionCount); found {
+ for _, client := range clients {
+ client.CloseSend()
+ }
+ }
+ time.Sleep(1100 * time.Millisecond)
+
+ return nil
+}