aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/msgclient/publisher.go
blob: 823791d10d9643c066fd378bd247172fda7123a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package msgclient

import (
	"context"

	"github.com/OneOfOne/xxhash"
	"google.golang.org/grpc"

	"github.com/chrislusf/seaweedfs/weed/mq/broker"
	"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)

type Publisher struct {
	publishClients     []mq_pb.SeaweedMessaging_PublishClient
	topicConfiguration *mq_pb.TopicConfiguration
	messageCount       uint64
	publisherId        string
}

func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
	// read topic configuration
	topicConfiguration := &mq_pb.TopicConfiguration{
		PartitionCount: 4,
	}
	publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
	for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
		tp := broker.TopicPartition{
			Namespace: namespace,
			Topic:     topic,
			Partition: int32(i),
		}
		grpcClientConn, err := mc.findBroker(tp)
		if err != nil {
			return nil, err
		}
		client, err := setupPublisherClient(grpcClientConn, tp)
		if err != nil {
			return nil, err
		}
		publishClients[i] = client
	}
	return &Publisher{
		publishClients:     publishClients,
		topicConfiguration: topicConfiguration,
	}, nil
}

func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {

	stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
	if err != nil {
		return nil, err
	}

	// send init message
	err = stream.Send(&mq_pb.PublishRequest{
		Init: &mq_pb.PublishRequest_InitMessage{
			Namespace: tp.Namespace,
			Topic:     tp.Topic,
			Partition: tp.Partition,
		},
	})
	if err != nil {
		return nil, err
	}

	// process init response
	initResponse, err := stream.Recv()
	if err != nil {
		return nil, err
	}
	if initResponse.Redirect != nil {
		// TODO follow redirection
	}
	if initResponse.Config != nil {
	}

	// setup looks for control messages
	doneChan := make(chan error, 1)
	go func() {
		for {
			in, err := stream.Recv()
			if err != nil {
				doneChan <- err
				return
			}
			if in.Redirect != nil {
			}
			if in.Config != nil {
			}
		}
	}()

	return stream, nil

}

func (p *Publisher) Publish(m *mq_pb.Message) error {
	hashValue := p.messageCount
	p.messageCount++
	if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
		if m.Key != nil {
			hashValue = xxhash.Checksum64(m.Key)
		}
	} else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
		hashValue = xxhash.Checksum64(m.Key)
	} else {
		// round robin
	}

	idx := int(hashValue) % len(p.publishClients)
	if idx < 0 {
		idx += len(p.publishClients)
	}
	return p.publishClients[idx].Send(&mq_pb.PublishRequest{
		Data: m,
	})
}