diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/brokder_grpc_pub.go | 33 | ||||
| -rw-r--r-- | weed/mq/client/publish_stream_processor.go | 179 | ||||
| -rw-r--r-- | weed/mq/client/publisher.go | 40 | ||||
| -rw-r--r-- | weed/mq/cmd/qsend/qsend.go | 62 | ||||
| -rw-r--r-- | weed/mq/messages/message_buffer.go | 53 | ||||
| -rw-r--r-- | weed/mq/messages/message_buffer_mover.go | 32 | ||||
| -rw-r--r-- | weed/mq/messages/message_pipeline.go | 163 | ||||
| -rw-r--r-- | weed/mq/messages/message_pipeline_test.go | 53 | ||||
| -rw-r--r-- | weed/mq/messages/messages.go | 10 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde.go | 37 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde_test.go | 10 |
11 files changed, 645 insertions, 27 deletions
diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go index a26be5171..cbcc83f9b 100644 --- a/weed/mq/broker/brokder_grpc_pub.go +++ b/weed/mq/broker/brokder_grpc_pub.go @@ -1,6 +1,7 @@ package broker import ( + "github.com/seaweedfs/seaweedfs/weed/pb/message_fbs" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -11,6 +12,36 @@ The messages is buffered in memory, and saved to filer under /topics/<topic>/info/segment_<id>.meta */ -func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { +func (broker *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error { + println("connected") + for { + request, recvErr := stream.Recv() + if recvErr != nil { + return recvErr + } + + print(">") + if request.Control != nil { + + } + if request.Data != nil { + if err := broker.processDataMessage(stream, request.Data); err != nil { + return err + } + } + + } + return nil +} + +func (broker *MessageQueueBroker) processDataMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer, data *mq_pb.PublishRequest_DataMessage) error { + mb := message_fbs.GetRootAsMessageBatch(data.Message, 0) + + println("message count:", mb.MessagesLength(), len(data.Message)) + m := &message_fbs.Message{} + for i := 0; i < mb.MessagesLength(); i++ { + mb.Messages(m, i) + println(i, ">", string(m.Data())) + } return nil } diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go new file mode 100644 index 000000000..7aefa6b86 --- /dev/null +++ b/weed/mq/client/publish_stream_processor.go @@ -0,0 +1,179 @@ +package client + +import ( + "context" + flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/mq/messages" + "github.com/seaweedfs/seaweedfs/weed/mq/segment" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "log" + "sync" + "sync/atomic" + "time" +) + +const ( + batchCountLimit = 3 +) + +type PublishStreamProcessor struct { + // attributes + ProducerId int32 + ProducerEpoch int32 + grpcDialOption grpc.DialOption + + // input + sync.Mutex + + timeout time.Duration + + // convert into bytes + messagesChan chan *messages.Message + builders chan *flatbuffers.Builder + batchMessageCountLimit int + + messagesSequence int64 + + // done channel + doneChan chan struct{} +} + +type UploadProcess struct { + bufferBuilder *flatbuffers.Builder + batchBuilder *segment.MessageBatchBuilder +} + +func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration) *PublishStreamProcessor { + t := &PublishStreamProcessor{ + grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + batchMessageCountLimit: batchMessageCountLimit, + builders: make(chan *flatbuffers.Builder, batchCountLimit), + messagesChan: make(chan *messages.Message, 1024), + doneChan: make(chan struct{}), + timeout: timeout, + } + for i := 0; i < batchCountLimit; i++ { + t.builders <- flatbuffers.NewBuilder(4 * 1024 * 1024) + } + go t.doLoopUpload() + return t +} + +func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error { + p.messagesChan <- m + return nil +} + +func (p *PublishStreamProcessor) Shutdown() error { + p.doneChan <- struct{}{} + return nil +} + +func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error { + + if len(messages) == 0 { + return nil + } + + builder := <-p.builders + bb := segment.NewMessageBatchBuilder(builder) + for _, m := range messages { + bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content) + p.messagesSequence++ + } + bb.BuildMessageBatch(p.ProducerId, p.ProducerEpoch, 3, 4) + defer func() { + p.builders <- builder + }() + + return stream.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.PublishRequest_DataMessage{ + Message: bb.GetBytes(), + }, + }) + +} + +func (p *PublishStreamProcessor) doLoopUpload() { + + brokerGrpcAddress := "localhost:17777" + + // TOOD parallelize the uploading with separate uploader + messages := make([]*messages.Message, 0, p.batchMessageCountLimit) + + util.RetryForever("publish message", func() error { + return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.PublishMessage(ctx) + if err != nil { + log.Printf("grpc PublishMessage: %v", err) + return err + } + + var atomicStatus int64 + go func() { + resp, err := stream.Recv() + if err != nil { + log.Printf("response error: %v", err) + } else { + log.Printf("response: %v", resp.AckSequence) + } + if atomic.LoadInt64(&atomicStatus) < 0 { + return + } + }() + + var flushErr error + // retry previously failed messages + if len(messages) >= p.batchMessageCountLimit { + flushErr = p.doFlush(stream, messages) + if flushErr != nil { + return flushErr + } + messages = messages[:0] + } + + for { + select { + case m := <-p.messagesChan: + messages = append(messages, m) + if len(messages) >= p.batchMessageCountLimit { + if flushErr = p.doFlush(stream, messages); flushErr != nil { + return flushErr + } + messages = messages[:0] + } + case <-time.After(p.timeout): + if flushErr = p.doFlush(stream, messages); flushErr != nil { + return flushErr + } + messages = messages[:0] + case <-p.doneChan: + if flushErr = p.doFlush(stream, messages); flushErr != nil { + return flushErr + } + messages = messages[:0] + println("$ stopping ...") + break + } + } + + // stop the response consuming goroutine + atomic.StoreInt64(&atomicStatus, -1) + + return flushErr + + }) + }, func(err error) (shouldContinue bool) { + log.Printf("failed with grpc %s: %v", brokerGrpcAddress, err) + return true + }) + +} diff --git a/weed/mq/client/publisher.go b/weed/mq/client/publisher.go new file mode 100644 index 000000000..826947721 --- /dev/null +++ b/weed/mq/client/publisher.go @@ -0,0 +1,40 @@ +package client + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/messages" + "github.com/seaweedfs/seaweedfs/weed/pb" + "time" +) + +type PublishProcessor interface { + AddMessage(m *messages.Message) error + Shutdown() error +} + +type PublisherOption struct { + Masters string + Topic string +} + +type Publisher struct { + option *PublisherOption + masters []pb.ServerAddress + processor *PublishStreamProcessor +} + +func NewPublisher(option *PublisherOption) *Publisher { + p := &Publisher{ + masters: pb.ServerAddresses(option.Masters).ToAddresses(), + option: option, + processor: NewPublishStreamProcessor(3, 887*time.Millisecond), + } + return p +} + +func (p Publisher) Publish(m *messages.Message) error { + return p.processor.AddMessage(m) +} + +func (p Publisher) Shutdown() error { + return p.processor.Shutdown() +} diff --git a/weed/mq/cmd/qsend/qsend.go b/weed/mq/cmd/qsend/qsend.go new file mode 100644 index 000000000..c80b220b8 --- /dev/null +++ b/weed/mq/cmd/qsend/qsend.go @@ -0,0 +1,62 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/client" + "github.com/seaweedfs/seaweedfs/weed/mq/messages" + "os" + "time" +) + +var ( + master = flag.String("master", "localhost:9333", "master csv list") + topic = flag.String("topic", "", "topic name") +) + +func main() { + flag.Parse() + + publisher := client.NewPublisher(&client.PublisherOption{ + Masters: *master, + Topic: *topic, + }) + + err := eachLineStdin(func(line string) error { + if len(line) > 0 { + if err := publisher.Publish(&messages.Message{ + Key: nil, + Content: []byte(line), + Properties: nil, + Ts: time.Time{}, + }); err != nil { + return err + } + } + return nil + }) + + publisher.Shutdown() + + if err != nil { + fmt.Printf("error: %v\n", err) + } +} + +func eachLineStdin(eachLineFn func(string) error) error { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + text := scanner.Text() + if err := eachLineFn(text); err != nil { + return err + } + } + + // handle error + if scanner.Err() != nil { + return fmt.Errorf("scan stdin: %v", scanner.Err()) + } + + return nil +} diff --git a/weed/mq/messages/message_buffer.go b/weed/mq/messages/message_buffer.go new file mode 100644 index 000000000..9e61d0dfb --- /dev/null +++ b/weed/mq/messages/message_buffer.go @@ -0,0 +1,53 @@ +package messages + +import ( + flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/mq/segment" +) + +type MessageBuffer struct { + fbsBuffer *flatbuffers.Builder + sequenceBase int64 + counter int64 + bb *segment.MessageBatchBuilder + isSealed bool +} + +func NewMessageBuffer() *MessageBuffer { + t := &MessageBuffer{ + fbsBuffer: flatbuffers.NewBuilder(4 * 1024 * 1024), + } + t.bb = segment.NewMessageBatchBuilder(t.fbsBuffer) + return t +} + +func (mb *MessageBuffer) Reset(sequenceBase int64) { + mb.sequenceBase = sequenceBase + mb.counter = 0 + mb.bb.Reset() +} + +func (mb *MessageBuffer) AddMessage(message *Message) { + mb.bb.AddMessage(mb.sequenceBase, message.Ts.UnixMilli(), message.Properties, message.Key, message.Content) + mb.sequenceBase++ + mb.counter++ +} + +func (mb *MessageBuffer) Len() int { + return int(mb.counter) +} + +func (mb *MessageBuffer) Seal(producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) { + mb.isSealed = true + mb.bb.BuildMessageBatch(producerId, producerEpoch, segmentId, flags) +} + +func (mb *MessageBuffer) Bytes() []byte { + if !mb.isSealed { + return nil + } + return mb.bb.GetBytes() +} diff --git a/weed/mq/messages/message_buffer_mover.go b/weed/mq/messages/message_buffer_mover.go new file mode 100644 index 000000000..6c29db095 --- /dev/null +++ b/weed/mq/messages/message_buffer_mover.go @@ -0,0 +1,32 @@ +package messages + +import "fmt" + +type MessageBufferMover interface { + Setup() + TearDown() + MoveBuffer(buffer *MessageBuffer) (MessageBufferReference, error) // should be thread-safe +} +type MessageBufferReference struct { + sequence int64 + fileId string +} + +var _ = MessageBufferMover(&EmptyMover{}) + +type EmptyMover struct { +} + +func (e *EmptyMover) Setup() { +} + +func (e *EmptyMover) TearDown() { +} + +func (e *EmptyMover) MoveBuffer(buffer *MessageBuffer) (MessageBufferReference, error) { + println("moving", buffer.sequenceBase) + return MessageBufferReference{ + sequence: buffer.sequenceBase, + fileId: fmt.Sprintf("buffer %d", buffer.sequenceBase), + }, nil +} diff --git a/weed/mq/messages/message_pipeline.go b/weed/mq/messages/message_pipeline.go new file mode 100644 index 000000000..be967b32e --- /dev/null +++ b/weed/mq/messages/message_pipeline.go @@ -0,0 +1,163 @@ +package messages + +import ( + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "log" + "sync" + "sync/atomic" + "time" +) + +type OnMessageFunc func(message *Message) + +type MessagePipeline struct { + // atomic status + atomicPipelineStatus int64 // -1: stop + + // attributes + ProducerId int32 + ProducerEpoch int32 + grpcDialOption grpc.DialOption + + emptyBuffersChan chan *MessageBuffer + sealedBuffersChan chan *MessageBuffer + movedBuffersChan chan MessageBufferReference + onMessageFn OnMessageFunc + mover MessageBufferMover + moverPool *util.LimitedAsyncExecutor + + // control pipeline + doneChan chan struct{} + batchSize int + timeout time.Duration + + incomingMessageLock sync.Mutex + incomingMessageBuffer *MessageBuffer + + messageSequence int64 +} + +func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeout time.Duration, mover MessageBufferMover) *MessagePipeline { + t := &MessagePipeline{ + ProducerId: producerId, + emptyBuffersChan: make(chan *MessageBuffer, workerCount), + sealedBuffersChan: make(chan *MessageBuffer, workerCount), + movedBuffersChan: make(chan MessageBufferReference, workerCount), + doneChan: make(chan struct{}), + batchSize: batchSize, + timeout: timeout, + moverPool: util.NewLimitedAsyncExecutor(workerCount), + mover: mover, + } + go t.doLoopUpload() + return t +} + +func (mp *MessagePipeline) NextMessageBufferReference() MessageBufferReference { + return mp.moverPool.NextFuture().Await().(MessageBufferReference) +} + +func (mp *MessagePipeline) AddMessage(message *Message) { + mp.incomingMessageLock.Lock() + defer mp.incomingMessageLock.Unlock() + + // get existing message buffer or create a new one + if mp.incomingMessageBuffer == nil { + select { + case mp.incomingMessageBuffer = <-mp.emptyBuffersChan: + default: + mp.incomingMessageBuffer = NewMessageBuffer() + } + mp.incomingMessageBuffer.Reset(mp.messageSequence) + } + + // add one message + mp.incomingMessageBuffer.AddMessage(message) + mp.messageSequence++ + + // seal the message buffer if full + if mp.incomingMessageBuffer.Len() >= mp.batchSize { + mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0) + mp.sealedBuffersChan <- mp.incomingMessageBuffer + mp.incomingMessageBuffer = nil + } +} + +func (mp *MessagePipeline) doLoopUpload() { + + mp.mover.Setup() + defer mp.mover.TearDown() + + ticker := time.NewTicker(mp.timeout) + for { + status := atomic.LoadInt64(&mp.atomicPipelineStatus) + if status == -100 { + return + } else if status == -1 { + // entering shutting down mode + atomic.StoreInt64(&mp.atomicPipelineStatus, -2) + mp.incomingMessageLock.Lock() + mp.doFlushIncomingMessages() + mp.incomingMessageLock.Unlock() + } + + select { + case messageBuffer := <-mp.sealedBuffersChan: + ticker.Reset(mp.timeout) + mp.moverPool.Execute(func() any { + var output MessageBufferReference + util.RetryForever("message mover", func() error { + if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil { + return flushErr + } else { + output = messageReference + } + return nil + }, func(err error) (shouldContinue bool) { + log.Printf("failed: %v", err) + return true + }) + return output + }) + case <-ticker.C: + if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 { + atomic.StoreInt64(&mp.atomicPipelineStatus, -100) + return + } + mp.incomingMessageLock.Lock() + mp.doFlushIncomingMessages() + mp.incomingMessageLock.Unlock() + } + } + + atomic.StoreInt64(&mp.atomicPipelineStatus, -100) + close(mp.movedBuffersChan) + +} + +func (mp *MessagePipeline) doFlushIncomingMessages() { + if mp.incomingMessageBuffer == nil || mp.incomingMessageBuffer.Len() == 0 { + return + } + mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0) + mp.sealedBuffersChan <- mp.incomingMessageBuffer + mp.incomingMessageBuffer = nil +} + +func (mp *MessagePipeline) ShutdownStart() { + if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 { + atomic.StoreInt64(&mp.atomicPipelineStatus, -1) + } +} +func (mp *MessagePipeline) ShutdownWait() { + for atomic.LoadInt64(&mp.atomicPipelineStatus) != -100 { + time.Sleep(331 * time.Millisecond) + } +} + +func (mp *MessagePipeline) ShutdownImmediate() { + if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 { + atomic.StoreInt64(&mp.atomicPipelineStatus, -100) + } +} diff --git a/weed/mq/messages/message_pipeline_test.go b/weed/mq/messages/message_pipeline_test.go new file mode 100644 index 000000000..e7ecfa8f6 --- /dev/null +++ b/weed/mq/messages/message_pipeline_test.go @@ -0,0 +1,53 @@ +package messages + +import ( + "testing" + "time" +) + +func TestAddMessage1(t *testing.T) { + mp := NewMessagePipeline(0, 3, 10, time.Second, &EmptyMover{}) + go func() { + for i := 0; i < 100; i++ { + mr := mp.NextMessageBufferReference() + println(mr.sequence, mr.fileId) + } + }() + + for i := 0; i < 100; i++ { + message := &Message{ + Key: []byte("key"), + Content: []byte("data"), + Properties: nil, + Ts: time.Now(), + } + mp.AddMessage(message) + } + + mp.ShutdownStart() + mp.ShutdownWait() + +} + +func TestAddMessage2(t *testing.T) { + mp := NewMessagePipeline(0, 3, 10, time.Second, &EmptyMover{}) + + for i := 0; i < 100; i++ { + message := &Message{ + Key: []byte("key"), + Content: []byte("data"), + Properties: nil, + Ts: time.Now(), + } + mp.AddMessage(message) + } + + mp.ShutdownStart() + mp.ShutdownWait() + + for i := 0; i < 100; i++ { + mr := mp.NextMessageBufferReference() + println(mr.sequence, mr.fileId) + } + +} diff --git a/weed/mq/messages/messages.go b/weed/mq/messages/messages.go new file mode 100644 index 000000000..b3bd66f52 --- /dev/null +++ b/weed/mq/messages/messages.go @@ -0,0 +1,10 @@ +package messages + +import "time" + +type Message struct { + Key []byte + Content []byte + Properties map[string]string + Ts time.Time +} diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go index 66a76c57d..bb979a2a8 100644 --- a/weed/mq/segment/message_serde.go +++ b/weed/mq/segment/message_serde.go @@ -7,10 +7,6 @@ import ( type MessageBatchBuilder struct { b *flatbuffers.Builder - producerId int32 - producerEpoch int32 - segmentId int32 - flags int32 messageOffsets []flatbuffers.UOffsetT segmentSeqBase int64 segmentSeqLast int64 @@ -18,24 +14,20 @@ type MessageBatchBuilder struct { tsMsLast int64 } -func NewMessageBatchBuilder(b *flatbuffers.Builder, - producerId int32, - producerEpoch int32, - segmentId int32, - flags int32) *MessageBatchBuilder { +func NewMessageBatchBuilder(b *flatbuffers.Builder) *MessageBatchBuilder { b.Reset() return &MessageBatchBuilder{ - b: b, - producerId: producerId, - producerEpoch: producerEpoch, - segmentId: segmentId, - flags: flags, + b: b, } } -func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string][]byte, key []byte, value []byte) { +func (builder *MessageBatchBuilder) Reset() { + builder.b.Reset() +} + +func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string]string, key []byte, value []byte) { if builder.segmentSeqBase == 0 { builder.segmentSeqBase = segmentSeq } @@ -48,7 +40,7 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro var names, values, pairs []flatbuffers.UOffsetT for k, v := range properties { names = append(names, builder.b.CreateString(k)) - values = append(values, builder.b.CreateByteVector(v)) + values = append(values, builder.b.CreateString(v)) } for i, _ := range names { message_fbs.NameValueStart(builder.b) @@ -80,7 +72,10 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro } -func (builder *MessageBatchBuilder) BuildMessageBatch() { +func (builder *MessageBatchBuilder) BuildMessageBatch(producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) { message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets)) for i := len(builder.messageOffsets) - 1; i >= 0; i-- { builder.b.PrependUOffsetT(builder.messageOffsets[i]) @@ -88,10 +83,10 @@ func (builder *MessageBatchBuilder) BuildMessageBatch() { messagesOffset := builder.b.EndVector(len(builder.messageOffsets)) message_fbs.MessageBatchStart(builder.b) - message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId) - message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch) - message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId) - message_fbs.MessageBatchAddFlags(builder.b, builder.flags) + message_fbs.MessageBatchAddProducerId(builder.b, producerId) + message_fbs.MessageBatchAddProducerEpoch(builder.b, producerEpoch) + message_fbs.MessageBatchAddSegmentId(builder.b, segmentId) + message_fbs.MessageBatchAddFlags(builder.b, flags) message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase) message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase)) message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase) diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go index c65bffb84..8849b393b 100644 --- a/weed/mq/segment/message_serde_test.go +++ b/weed/mq/segment/message_serde_test.go @@ -10,15 +10,15 @@ import ( func TestMessageSerde(t *testing.T) { b := flatbuffers.NewBuilder(1024) - prop := make(map[string][]byte) - prop["n1"] = []byte("v1") - prop["n2"] = []byte("v2") + prop := make(map[string]string) + prop["n1"] = "v1" + prop["n2"] = "v2" - bb := NewMessageBatchBuilder(b, 1, 2, 3, 4) + bb := NewMessageBatchBuilder(b) bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here")) - bb.BuildMessageBatch() + bb.BuildMessageBatch(1, 2, 3, 4) buf := bb.GetBytes() |
