diff options
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/msg_broker_grpc_server.go | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/weed/messaging/msg_broker_grpc_server.go b/weed/messaging/msg_broker_grpc_server.go index a29bc11b0..5b93b8f62 100644 --- a/weed/messaging/msg_broker_grpc_server.go +++ b/weed/messaging/msg_broker_grpc_server.go @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util/log_buffer" ) -func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error { +func (broker *MessageBroker) Subscribe(server messaging_pb.SeaweedMessaging_SubscribeServer) error { panic("implement me") } @@ -28,14 +28,16 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis if err != nil { return err } - namespace, topic, partition := in.Namespace, in.Topic, in.Partition + namespace, topic, partition := in.Init.Namespace, in.Init.Topic, in.Init.Partition updatesChan := make(chan int32) go func() { for update := range updatesChan { if err := stream.Send(&messaging_pb.PublishResponse{ - PartitionCount: update, + Config: &messaging_pb.PublishResponse_ConfigMessage{ + PartitionCount: update, + }, }); err != nil { glog.V(0).Infof("err sending publish response: %v", err) return @@ -73,9 +75,9 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis m := &messaging_pb.Message{ Timestamp: time.Now().UnixNano(), - Key: in.Key, - Value: in.Value, - Headers: in.Headers, + Key: in.Data.Key, + Value: in.Data.Value, + Headers: in.Data.Headers, } data, err := proto.Marshal(m) @@ -84,7 +86,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis continue } - logBuffer.AddToBuffer(in.Key, data) + logBuffer.AddToBuffer(in.Data.Key, data) } } |
