diff options
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 40e8014c6..8cb481051 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -3,6 +3,12 @@ package pub_client import ( "context" "fmt" + "log" + "sort" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -11,11 +17,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "log" - "sort" - "sync" - "sync/atomic" - "time" ) type EachPartitionError struct { @@ -188,10 +189,10 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) return } - if ackResp.AckSequence > 0 { - log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + if ackResp.AckTsNs > 0 { + log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckTsNs, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } - if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckTsNs && atomic.LoadInt32(&hasMoreData) == 0 { return } } @@ -238,9 +239,9 @@ func (p *TopicPublisher) doConfigureTopic() (err error) { p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ - Topic: p.config.Topic.ToPbTopic(), - PartitionCount: p.config.PartitionCount, - RecordType: p.config.RecordType, // TODO schema upgrade + Topic: p.config.Topic.ToPbTopic(), + PartitionCount: p.config.PartitionCount, + MessageRecordType: p.config.RecordType, // Flat schema }) return err }) |
