aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
-rw-r--r--weed/mq/client/pub_client/scheduler.go23
1 files changed, 12 insertions, 11 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index 40e8014c6..8cb481051 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -3,6 +3,12 @@ package pub_client
import (
"context"
"fmt"
+ "log"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -11,11 +17,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
- "log"
- "sort"
- "sync"
- "sync/atomic"
- "time"
)
type EachPartitionError struct {
@@ -188,10 +189,10 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
return
}
- if ackResp.AckSequence > 0 {
- log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ if ackResp.AckTsNs > 0 {
+ log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckTsNs, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
}
- if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckTsNs && atomic.LoadInt32(&hasMoreData) == 0 {
return
}
}
@@ -238,9 +239,9 @@ func (p *TopicPublisher) doConfigureTopic() (err error) {
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
- Topic: p.config.Topic.ToPbTopic(),
- PartitionCount: p.config.PartitionCount,
- RecordType: p.config.RecordType, // TODO schema upgrade
+ Topic: p.config.Topic.ToPbTopic(),
+ PartitionCount: p.config.PartitionCount,
+ MessageRecordType: p.config.RecordType, // Flat schema
})
return err
})