aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client')
-rw-r--r--weed/mq/client/pub_client/scheduler.go23
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go13
-rw-r--r--weed/mq/client/sub_client/subscribe.go11
-rw-r--r--weed/mq/client/sub_client/subscriber.go7
4 files changed, 29 insertions, 25 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index 40e8014c6..8cb481051 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -3,6 +3,12 @@ package pub_client
import (
"context"
"fmt"
+ "log"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -11,11 +17,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
- "log"
- "sort"
- "sync"
- "sync/atomic"
- "time"
)
type EachPartitionError struct {
@@ -188,10 +189,10 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
return
}
- if ackResp.AckSequence > 0 {
- log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ if ackResp.AckTsNs > 0 {
+ log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckTsNs, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
}
- if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckTsNs && atomic.LoadInt32(&hasMoreData) == 0 {
return
}
}
@@ -238,9 +239,9 @@ func (p *TopicPublisher) doConfigureTopic() (err error) {
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
- Topic: p.config.Topic.ToPbTopic(),
- PartitionCount: p.config.PartitionCount,
- RecordType: p.config.RecordType, // TODO schema upgrade
+ Topic: p.config.Topic.ToPbTopic(),
+ PartitionCount: p.config.PartitionCount,
+ MessageRecordType: p.config.RecordType, // Flat schema
})
return err
})
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index b6d6e90b5..470e886d2 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
+ "io"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "io"
)
-type KeyedOffset struct {
- Key []byte
- Offset int64
+type KeyedTimestamp struct {
+ Key []byte
+ TsNs int64 // Timestamp in nanoseconds for acknowledgment
}
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error {
@@ -78,8 +79,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Key: ack.Key,
- Sequence: ack.Offset,
+ Key: ack.Key,
+ TsNs: ack.TsNs,
},
},
})
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index d4dea3852..0f3f9b5ee 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,12 +1,13 @@
package sub_client
import (
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "sync"
- "time"
)
type ProcessorState struct {
@@ -75,9 +76,9 @@ func (sub *TopicSubscriber) startProcessors() {
if sub.OnDataMessageFunc != nil {
sub.OnDataMessageFunc(m)
}
- sub.PartitionOffsetChan <- KeyedOffset{
- Key: m.Data.Key,
- Offset: m.Data.TsNs,
+ sub.PartitionOffsetChan <- KeyedTimestamp{
+ Key: m.Data.Key,
+ TsNs: m.Data.TsNs,
}
})
}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index ec15d998e..68bf74c5e 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -2,11 +2,12 @@ package sub_client
import (
"context"
+ "sync"
+
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc"
- "sync"
)
type SubscriberConfiguration struct {
@@ -44,10 +45,10 @@ type TopicSubscriber struct {
bootstrapBrokers []string
activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex
- PartitionOffsetChan chan KeyedOffset
+ PartitionOffsetChan chan KeyedTimestamp
}
-func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber {
+func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedTimestamp) *TopicSubscriber {
return &TopicSubscriber{
ctx: ctx,
SubscriberConfig: subscriber,