aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/on_each_partition.go
blob: fbfcc3c6b1ae589fdfd36349ba570edd04c4bffb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package sub_client

import (
	"context"
	"fmt"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
	"io"
	"reflect"
	"time"
)

type KeyedOffset struct {
	Key    []byte
	Offset int64
}

func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error {
	// connect to the partition broker
	return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {

		subscribeClient, err := client.SubscribeMessage(context.Background())
		if err != nil {
			return fmt.Errorf("create subscribe client: %v", err)
		}

		slidingWindowSize := sub.SubscriberConfig.SlidingWindowSize
		if slidingWindowSize <= 0 {
			slidingWindowSize = 1
		}

		po := findPartitionOffset(sub.ContentConfig.PartitionOffsets, assigned.Partition)
		if po == nil {
			po = &schema_pb.PartitionOffset{
				Partition: assigned.Partition,
				StartTsNs: time.Now().UnixNano(),
				StartType: schema_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
			}
		}

		if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
			Message: &mq_pb.SubscribeMessageRequest_Init{
				Init: &mq_pb.SubscribeMessageRequest_InitMessage{
					ConsumerGroup:     sub.SubscriberConfig.ConsumerGroup,
					ConsumerId:        sub.SubscriberConfig.ConsumerGroupInstanceId,
					Topic:             sub.ContentConfig.Topic.ToPbTopic(),
					PartitionOffset:   po,
					Filter:            sub.ContentConfig.Filter,
					FollowerBroker:    assigned.FollowerBroker,
					SlidingWindowSize: slidingWindowSize,
				},
			},
		}); err != nil {
			glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
		}

		glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)

		if sub.OnCompletionFunc != nil {
			defer sub.OnCompletionFunc()
		}

		go func() {
			for {
				select {
				case <-stopCh:
					subscribeClient.CloseSend()
					return
				case ack, ok := <-sub.PartitionOffsetChan:
					if !ok {
						subscribeClient.CloseSend()
						return
					}
					subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
						Message: &mq_pb.SubscribeMessageRequest_Ack{
							Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
								Key:      ack.Key,
								Sequence: ack.Offset,
							},
						},
					})
				}
			}
		}()

		for {
			// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
			resp, err := subscribeClient.Recv()
			if err != nil {
				return fmt.Errorf("subscribe recv: %v", err)
			}
			if resp.Message == nil {
				glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
				continue
			}
			switch m := resp.Message.(type) {
			case *mq_pb.SubscribeMessageResponse_Data:
				if m.Data.Ctrl != nil {
					glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
					continue
				}
				if len(m.Data.Key) == 0 {
					fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
					continue
				}
				onDataMessageFn(m)
			case *mq_pb.SubscribeMessageResponse_Ctrl:
				// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
				if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
					return io.EOF
				}
			}
		}

	})
}

func findPartitionOffset(partitionOffsets []*schema_pb.PartitionOffset, partition *schema_pb.Partition) *schema_pb.PartitionOffset {
	for _, po := range partitionOffsets {
		if po.Partition == partition {
			return po
		}
	}
	return nil
}