aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/publish_stream_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/publish_stream_processor.go')
-rw-r--r--weed/mq/client/publish_stream_processor.go11
1 files changed, 6 insertions, 5 deletions
diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go
index c23c6a64a..f83bcd08b 100644
--- a/weed/mq/client/publish_stream_processor.go
+++ b/weed/mq/client/publish_stream_processor.go
@@ -3,6 +3,7 @@ package client
import (
"context"
flatbuffers "github.com/google/flatbuffers/go"
+ "github.com/seaweedfs/seaweedfs/weed/mq/messages"
"github.com/seaweedfs/seaweedfs/weed/mq/segment"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -31,7 +32,7 @@ type PublishStreamProcessor struct {
timeout time.Duration
// convert into bytes
- messagesChan chan *Message
+ messagesChan chan *messages.Message
builders chan *flatbuffers.Builder
batchMessageCountLimit int
@@ -51,7 +52,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
batchMessageCountLimit: batchMessageCountLimit,
builders: make(chan *flatbuffers.Builder, batchCountLimit),
- messagesChan: make(chan *Message, 1024),
+ messagesChan: make(chan *messages.Message, 1024),
doneChan: make(chan struct{}),
timeout: timeout,
}
@@ -62,7 +63,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration
return t
}
-func (p *PublishStreamProcessor) AddMessage(m *Message) error {
+func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error {
p.messagesChan <- m
return nil
}
@@ -72,7 +73,7 @@ func (p *PublishStreamProcessor) Shutdown() error {
return nil
}
-func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*Message) error {
+func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error {
if len(messages) == 0 {
return nil
@@ -102,7 +103,7 @@ func (p *PublishStreamProcessor) doLoopUpload() {
brokerGrpcAddress := "localhost:17777"
// TOOD parallelize the uploading with separate uploader
- messages := make([]*Message, 0, p.batchMessageCountLimit)
+ messages := make([]*messages.Message, 0, p.batchMessageCountLimit)
util.RetryForever("publish message", func() error {
return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {