diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_server_publish.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_server_publish.go | 112 |
1 files changed, 0 insertions, 112 deletions
diff --git a/weed/mq/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go deleted file mode 100644 index eb76dd5dc..000000000 --- a/weed/mq/broker/broker_grpc_server_publish.go +++ /dev/null @@ -1,112 +0,0 @@ -package broker - -import ( - "crypto/md5" - "fmt" - "io" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" -) - -func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { - - // process initial request - in, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - // TODO look it up - topicConfig := &mq_pb.TopicConfiguration{ - // IsTransient: true, - } - - // send init response - initResponse := &mq_pb.PublishResponse{ - Config: nil, - Redirect: nil, - } - err = stream.Send(initResponse) - if err != nil { - return err - } - if initResponse.Redirect != nil { - return nil - } - - // get lock - tp := TopicPartition{ - Namespace: in.Init.Namespace, - Topic: in.Init.Topic, - Partition: in.Init.Partition, - } - - tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic) - md5File := fmt.Sprintf("p%02d.md5", tp.Partition) - // println("chan data stored under", tpDir, "as", md5File) - - if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists { - return fmt.Errorf("channel is already closed") - } - - tl := broker.topicManager.RequestLock(tp, topicConfig, true) - defer broker.topicManager.ReleaseLock(tp, true) - - md5hash := md5.New() - // process each message - for { - // println("recv") - in, err := stream.Recv() - // glog.V(0).Infof("recieved %v err: %v", in, err) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - if in.Data == nil { - continue - } - - // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value)) - - data, err := proto.Marshal(in.Data) - if err != nil { - glog.Errorf("marshall error: %v\n", err) - continue - } - - tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs) - - if in.Data.IsClose { - // println("server received closing") - break - } - - md5hash.Write(in.Data.Value) - - } - - if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil { - glog.V(0).Infof("err writing %s: %v", md5File, err) - } - - // fmt.Printf("received md5 %X\n", md5hash.Sum(nil)) - - // send the close ack - // println("server send ack closing") - if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil { - glog.V(0).Infof("err sending close response: %v", err) - } - return nil - -} |
