aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_server_publish.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_server_publish.go')
-rw-r--r--weed/mq/broker/broker_grpc_server_publish.go112
1 files changed, 0 insertions, 112 deletions
diff --git a/weed/mq/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go
deleted file mode 100644
index eb76dd5dc..000000000
--- a/weed/mq/broker/broker_grpc_server_publish.go
+++ /dev/null
@@ -1,112 +0,0 @@
-package broker
-
-import (
- "crypto/md5"
- "fmt"
- "io"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- // TODO look it up
- topicConfig := &mq_pb.TopicConfiguration{
- // IsTransient: true,
- }
-
- // send init response
- initResponse := &mq_pb.PublishResponse{
- Config: nil,
- Redirect: nil,
- }
- err = stream.Send(initResponse)
- if err != nil {
- return err
- }
- if initResponse.Redirect != nil {
- return nil
- }
-
- // get lock
- tp := TopicPartition{
- Namespace: in.Init.Namespace,
- Topic: in.Init.Topic,
- Partition: in.Init.Partition,
- }
-
- tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
- md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
- // println("chan data stored under", tpDir, "as", md5File)
-
- if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
- return fmt.Errorf("channel is already closed")
- }
-
- tl := broker.topicManager.RequestLock(tp, topicConfig, true)
- defer broker.topicManager.ReleaseLock(tp, true)
-
- md5hash := md5.New()
- // process each message
- for {
- // println("recv")
- in, err := stream.Recv()
- // glog.V(0).Infof("recieved %v err: %v", in, err)
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- if in.Data == nil {
- continue
- }
-
- // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
-
- data, err := proto.Marshal(in.Data)
- if err != nil {
- glog.Errorf("marshall error: %v\n", err)
- continue
- }
-
- tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
-
- if in.Data.IsClose {
- // println("server received closing")
- break
- }
-
- md5hash.Write(in.Data.Value)
-
- }
-
- if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
- glog.V(0).Infof("err writing %s: %v", md5File, err)
- }
-
- // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
-
- // send the close ack
- // println("server send ack closing")
- if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil {
- glog.V(0).Infof("err sending close response: %v", err)
- }
- return nil
-
-}