diff options
Diffstat (limited to 'weed/mq/client')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 23 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 13 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 11 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 7 |
4 files changed, 29 insertions, 25 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 40e8014c6..8cb481051 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -3,6 +3,12 @@ package pub_client import ( "context" "fmt" + "log" + "sort" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -11,11 +17,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "log" - "sort" - "sync" - "sync/atomic" - "time" ) type EachPartitionError struct { @@ -188,10 +189,10 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) return } - if ackResp.AckSequence > 0 { - log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + if ackResp.AckTsNs > 0 { + log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckTsNs, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } - if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckTsNs && atomic.LoadInt32(&hasMoreData) == 0 { return } } @@ -238,9 +239,9 @@ func (p *TopicPublisher) doConfigureTopic() (err error) { p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ - Topic: p.config.Topic.ToPbTopic(), - PartitionCount: p.config.PartitionCount, - RecordType: p.config.RecordType, // TODO schema upgrade + Topic: p.config.Topic.ToPbTopic(), + PartitionCount: p.config.PartitionCount, + MessageRecordType: p.config.RecordType, // Flat schema }) return err }) diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index b6d6e90b5..470e886d2 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -4,16 +4,17 @@ import ( "context" "errors" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - "io" ) -type KeyedOffset struct { - Key []byte - Offset int64 +type KeyedTimestamp struct { + Key []byte + TsNs int64 // Timestamp in nanoseconds for acknowledgment } func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error { @@ -78,8 +79,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Key: ack.Key, - Sequence: ack.Offset, + Key: ack.Key, + TsNs: ack.TsNs, }, }, }) diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index d4dea3852..0f3f9b5ee 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,12 +1,13 @@ package sub_client import ( + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "sync" - "time" ) type ProcessorState struct { @@ -75,9 +76,9 @@ func (sub *TopicSubscriber) startProcessors() { if sub.OnDataMessageFunc != nil { sub.OnDataMessageFunc(m) } - sub.PartitionOffsetChan <- KeyedOffset{ - Key: m.Data.Key, - Offset: m.Data.TsNs, + sub.PartitionOffsetChan <- KeyedTimestamp{ + Key: m.Data.Key, + TsNs: m.Data.TsNs, } }) } diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index ec15d998e..68bf74c5e 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -2,11 +2,12 @@ package sub_client import ( "context" + "sync" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" - "sync" ) type SubscriberConfiguration struct { @@ -44,10 +45,10 @@ type TopicSubscriber struct { bootstrapBrokers []string activeProcessors map[topic.Partition]*ProcessorState activeProcessorsLock sync.Mutex - PartitionOffsetChan chan KeyedOffset + PartitionOffsetChan chan KeyedTimestamp } -func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber { +func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedTimestamp) *TopicSubscriber { return &TopicSubscriber{ ctx: ctx, SubscriberConfig: subscriber, |
