diff options
Diffstat (limited to 'weed/mq/client/publish_stream_processor.go')
| -rw-r--r-- | weed/mq/client/publish_stream_processor.go | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go index c23c6a64a..f83bcd08b 100644 --- a/weed/mq/client/publish_stream_processor.go +++ b/weed/mq/client/publish_stream_processor.go @@ -3,6 +3,7 @@ package client import ( "context" flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/mq/messages" "github.com/seaweedfs/seaweedfs/weed/mq/segment" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -31,7 +32,7 @@ type PublishStreamProcessor struct { timeout time.Duration // convert into bytes - messagesChan chan *Message + messagesChan chan *messages.Message builders chan *flatbuffers.Builder batchMessageCountLimit int @@ -51,7 +52,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), batchMessageCountLimit: batchMessageCountLimit, builders: make(chan *flatbuffers.Builder, batchCountLimit), - messagesChan: make(chan *Message, 1024), + messagesChan: make(chan *messages.Message, 1024), doneChan: make(chan struct{}), timeout: timeout, } @@ -62,7 +63,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration return t } -func (p *PublishStreamProcessor) AddMessage(m *Message) error { +func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error { p.messagesChan <- m return nil } @@ -72,7 +73,7 @@ func (p *PublishStreamProcessor) Shutdown() error { return nil } -func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*Message) error { +func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error { if len(messages) == 0 { return nil @@ -102,7 +103,7 @@ func (p *PublishStreamProcessor) doLoopUpload() { brokerGrpcAddress := "localhost:17777" // TOOD parallelize the uploading with separate uploader - messages := make([]*Message, 0, p.batchMessageCountLimit) + messages := make([]*messages.Message, 0, p.batchMessageCountLimit) util.RetryForever("publish message", func() error { return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { |
