diff options
Diffstat (limited to 'weed/mq/client/publish_stream_processor.go')
| -rw-r--r-- | weed/mq/client/publish_stream_processor.go | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go new file mode 100644 index 000000000..7aefa6b86 --- /dev/null +++ b/weed/mq/client/publish_stream_processor.go @@ -0,0 +1,179 @@ +package client + +import ( + "context" + flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/mq/messages" + "github.com/seaweedfs/seaweedfs/weed/mq/segment" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "log" + "sync" + "sync/atomic" + "time" +) + +const ( + batchCountLimit = 3 +) + +type PublishStreamProcessor struct { + // attributes + ProducerId int32 + ProducerEpoch int32 + grpcDialOption grpc.DialOption + + // input + sync.Mutex + + timeout time.Duration + + // convert into bytes + messagesChan chan *messages.Message + builders chan *flatbuffers.Builder + batchMessageCountLimit int + + messagesSequence int64 + + // done channel + doneChan chan struct{} +} + +type UploadProcess struct { + bufferBuilder *flatbuffers.Builder + batchBuilder *segment.MessageBatchBuilder +} + +func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration) *PublishStreamProcessor { + t := &PublishStreamProcessor{ + grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + batchMessageCountLimit: batchMessageCountLimit, + builders: make(chan *flatbuffers.Builder, batchCountLimit), + messagesChan: make(chan *messages.Message, 1024), + doneChan: make(chan struct{}), + timeout: timeout, + } + for i := 0; i < batchCountLimit; i++ { + t.builders <- flatbuffers.NewBuilder(4 * 1024 * 1024) + } + go t.doLoopUpload() + return t +} + +func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error { + p.messagesChan <- m + return nil +} + +func (p *PublishStreamProcessor) Shutdown() error { + p.doneChan <- struct{}{} + return nil +} + +func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error { + + if len(messages) == 0 { + return nil + } + + builder := <-p.builders + bb := segment.NewMessageBatchBuilder(builder) + for _, m := range messages { + bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content) + p.messagesSequence++ + } + bb.BuildMessageBatch(p.ProducerId, p.ProducerEpoch, 3, 4) + defer func() { + p.builders <- builder + }() + + return stream.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.PublishRequest_DataMessage{ + Message: bb.GetBytes(), + }, + }) + +} + +func (p *PublishStreamProcessor) doLoopUpload() { + + brokerGrpcAddress := "localhost:17777" + + // TOOD parallelize the uploading with separate uploader + messages := make([]*messages.Message, 0, p.batchMessageCountLimit) + + util.RetryForever("publish message", func() error { + return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.PublishMessage(ctx) + if err != nil { + log.Printf("grpc PublishMessage: %v", err) + return err + } + + var atomicStatus int64 + go func() { + resp, err := stream.Recv() + if err != nil { + log.Printf("response error: %v", err) + } else { + log.Printf("response: %v", resp.AckSequence) + } + if atomic.LoadInt64(&atomicStatus) < 0 { + return + } + }() + + var flushErr error + // retry previously failed messages + if len(messages) >= p.batchMessageCountLimit { + flushErr = p.doFlush(stream, messages) + if flushErr != nil { + return flushErr + } + messages = messages[:0] + } + + for { + select { + case m := <-p.messagesChan: + messages = append(messages, m) + if len(messages) >= p.batchMessageCountLimit { + if flushErr = p.doFlush(stream, messages); flushErr != nil { + return flushErr + } + messages = messages[:0] + } + case <-time.After(p.timeout): + if flushErr = p.doFlush(stream, messages); flushErr != nil { + return flushErr + } + messages = messages[:0] + case <-p.doneChan: + if flushErr = p.doFlush(stream, messages); flushErr != nil { + return flushErr + } + messages = messages[:0] + println("$ stopping ...") + break + } + } + + // stop the response consuming goroutine + atomic.StoreInt64(&atomicStatus, -1) + + return flushErr + + }) + }, func(err error) (shouldContinue bool) { + log.Printf("failed with grpc %s: %v", brokerGrpcAddress, err) + return true + }) + +} |
