diff options
Diffstat (limited to 'weed/mq/broker/brokder_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/brokder_grpc_pub.go | 33 |
1 files changed, 32 insertions, 1 deletions
diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go index a26be5171..cbcc83f9b 100644 --- a/weed/mq/broker/brokder_grpc_pub.go +++ b/weed/mq/broker/brokder_grpc_pub.go @@ -1,6 +1,7 @@ package broker import ( + "github.com/seaweedfs/seaweedfs/weed/pb/message_fbs" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -11,6 +12,36 @@ The messages is buffered in memory, and saved to filer under /topics/<topic>/info/segment_<id>.meta */ -func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { +func (broker *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error { + println("connected") + for { + request, recvErr := stream.Recv() + if recvErr != nil { + return recvErr + } + + print(">") + if request.Control != nil { + + } + if request.Data != nil { + if err := broker.processDataMessage(stream, request.Data); err != nil { + return err + } + } + + } + return nil +} + +func (broker *MessageQueueBroker) processDataMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer, data *mq_pb.PublishRequest_DataMessage) error { + mb := message_fbs.GetRootAsMessageBatch(data.Message, 0) + + println("message count:", mb.MessagesLength(), len(data.Message)) + m := &message_fbs.Message{} + for i := 0; i < mb.MessagesLength(); i++ { + mb.Messages(m, i) + println(i, ">", string(m.Data())) + } return nil } |
