aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_grpc_server_publish.go
blob: 515c70b9688110d679eee17fd9478adb344ca0c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package broker

import (
	"crypto/md5"
	"fmt"
	"io"

	"github.com/golang/protobuf/proto"

	"github.com/chrislusf/seaweedfs/weed/filer"
	"github.com/chrislusf/seaweedfs/weed/util/log"
	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
	"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)

func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {

	// process initial request
	in, err := stream.Recv()
	if err == io.EOF {
		return nil
	}
	if err != nil {
		return err
	}

	// TODO look it up
	topicConfig := &messaging_pb.TopicConfiguration{
		// IsTransient: true,
	}

	// send init response
	initResponse := &messaging_pb.PublishResponse{
		Config:   nil,
		Redirect: nil,
	}
	err = stream.Send(initResponse)
	if err != nil {
		return err
	}
	if initResponse.Redirect != nil {
		return nil
	}

	// get lock
	tp := TopicPartition{
		Namespace: in.Init.Namespace,
		Topic:     in.Init.Topic,
		Partition: in.Init.Partition,
	}

	tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
	md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
	// println("chan data stored under", tpDir, "as", md5File)

	if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
		return fmt.Errorf("channel is already closed")
	}

	tl := broker.topicManager.RequestLock(tp, topicConfig, true)
	defer broker.topicManager.ReleaseLock(tp, true)

	md5hash := md5.New()
	// process each message
	for {
		// println("recv")
		in, err := stream.Recv()
		// log.Infof("recieved %v err: %v", in, err)
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}

		if in.Data == nil {
			continue
		}

		// fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))

		data, err := proto.Marshal(in.Data)
		if err != nil {
			log.Errorf("marshall error: %v\n", err)
			continue
		}

		tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)

		if in.Data.IsClose {
			// println("server received closing")
			break
		}

		md5hash.Write(in.Data.Value)

	}

	if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
		log.Infof("err writing %s: %v", md5File, err)
	}

	// fmt.Printf("received md5 %X\n", md5hash.Sum(nil))

	// send the close ack
	// println("server send ack closing")
	if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
		log.Infof("err sending close response: %v", err)
	}
	return nil

}