aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/publish_stream_processor.go11
-rw-r--r--weed/mq/client/publisher.go12
-rw-r--r--weed/mq/cmd/qsend/qsend.go3
-rw-r--r--weed/mq/messages/messages.go10
4 files changed, 21 insertions, 15 deletions
diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go
index c23c6a64a..f83bcd08b 100644
--- a/weed/mq/client/publish_stream_processor.go
+++ b/weed/mq/client/publish_stream_processor.go
@@ -3,6 +3,7 @@ package client
import (
"context"
flatbuffers "github.com/google/flatbuffers/go"
+ "github.com/seaweedfs/seaweedfs/weed/mq/messages"
"github.com/seaweedfs/seaweedfs/weed/mq/segment"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -31,7 +32,7 @@ type PublishStreamProcessor struct {
timeout time.Duration
// convert into bytes
- messagesChan chan *Message
+ messagesChan chan *messages.Message
builders chan *flatbuffers.Builder
batchMessageCountLimit int
@@ -51,7 +52,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
batchMessageCountLimit: batchMessageCountLimit,
builders: make(chan *flatbuffers.Builder, batchCountLimit),
- messagesChan: make(chan *Message, 1024),
+ messagesChan: make(chan *messages.Message, 1024),
doneChan: make(chan struct{}),
timeout: timeout,
}
@@ -62,7 +63,7 @@ func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration
return t
}
-func (p *PublishStreamProcessor) AddMessage(m *Message) error {
+func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error {
p.messagesChan <- m
return nil
}
@@ -72,7 +73,7 @@ func (p *PublishStreamProcessor) Shutdown() error {
return nil
}
-func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*Message) error {
+func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error {
if len(messages) == 0 {
return nil
@@ -102,7 +103,7 @@ func (p *PublishStreamProcessor) doLoopUpload() {
brokerGrpcAddress := "localhost:17777"
// TOOD parallelize the uploading with separate uploader
- messages := make([]*Message, 0, p.batchMessageCountLimit)
+ messages := make([]*messages.Message, 0, p.batchMessageCountLimit)
util.RetryForever("publish message", func() error {
return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
diff --git a/weed/mq/client/publisher.go b/weed/mq/client/publisher.go
index 30de47665..826947721 100644
--- a/weed/mq/client/publisher.go
+++ b/weed/mq/client/publisher.go
@@ -1,12 +1,13 @@
package client
import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/messages"
"github.com/seaweedfs/seaweedfs/weed/pb"
"time"
)
type PublishProcessor interface {
- AddMessage(m *Message) error
+ AddMessage(m *messages.Message) error
Shutdown() error
}
@@ -30,14 +31,7 @@ func NewPublisher(option *PublisherOption) *Publisher {
return p
}
-type Message struct {
- Key []byte
- Content []byte
- Properties map[string]string
- Ts time.Time
-}
-
-func (p Publisher) Publish(m *Message) error {
+func (p Publisher) Publish(m *messages.Message) error {
return p.processor.AddMessage(m)
}
diff --git a/weed/mq/cmd/qsend/qsend.go b/weed/mq/cmd/qsend/qsend.go
index 34f7e6dc5..c80b220b8 100644
--- a/weed/mq/cmd/qsend/qsend.go
+++ b/weed/mq/cmd/qsend/qsend.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/messages"
"os"
"time"
)
@@ -24,7 +25,7 @@ func main() {
err := eachLineStdin(func(line string) error {
if len(line) > 0 {
- if err := publisher.Publish(&client.Message{
+ if err := publisher.Publish(&messages.Message{
Key: nil,
Content: []byte(line),
Properties: nil,
diff --git a/weed/mq/messages/messages.go b/weed/mq/messages/messages.go
new file mode 100644
index 000000000..b3bd66f52
--- /dev/null
+++ b/weed/mq/messages/messages.go
@@ -0,0 +1,10 @@
+package messages
+
+import "time"
+
+type Message struct {
+ Key []byte
+ Content []byte
+ Properties map[string]string
+ Ts time.Time
+}