aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscriber.go
blob: 9adc5197efd9e859f5095907a886b426c50dceb6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package sub_client

import (
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"google.golang.org/grpc"
	"time"
)

type SubscriberConfiguration struct {
	ClientId                string
	ConsumerGroup           string
	ConsumerGroupInstanceId string
	GroupMinimumPeers       int32
	GroupMaximumPeers int32
	BootstrapServers  []string
	GrpcDialOption    grpc.DialOption
}

type ContentConfiguration struct {
	Namespace string
	Topic     string
	Filter    string
	StartTime time.Time
}

type ProcessorConfiguration struct {
	ConcurrentPartitionLimit int // how many partitions to process concurrently
}

type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error)
type OnCompletionFunc func()

type TopicSubscriber struct {
	SubscriberConfig           *SubscriberConfiguration
	ContentConfig              *ContentConfiguration
	ProcessorConfig            *ProcessorConfiguration
	brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
	OnEachMessageFunc          OnEachMessageFunc
	OnCompletionFunc           OnCompletionFunc
	bootstrapBrokers           []string
	waitForMoreMessage         bool
	alreadyProcessedTsNs       int64
}

func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {
	return &TopicSubscriber{
		SubscriberConfig:     subscriber,
		ContentConfig:        content,
		ProcessorConfig:      &processor,
		bootstrapBrokers:     bootstrapBrokers,
		waitForMoreMessage:   true,
		alreadyProcessedTsNs: content.StartTime.UnixNano(),
	}
}

func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
	sub.OnEachMessageFunc = onEachMessageFn
}

func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {
	sub.OnCompletionFunc = onCompletionFn
}