aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/client/pub_client/publish.go12
-rw-r--r--weed/mq/client/pub_client/publisher.go2
-rw-r--r--weed/mq/client/pub_client/scheduler.go1
3 files changed, 15 insertions, 0 deletions
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index fbb07b042..92a1a1599 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -2,8 +2,10 @@ package pub_client
import (
"fmt"
+ "github.com/golang/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"time"
)
@@ -25,6 +27,16 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
})
}
+func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.RecordValue) error {
+ // serialize record value
+ value, err := proto.Marshal(recordValue)
+ if err != nil {
+ return fmt.Errorf("failed to marshal record value: %v", err)
+ }
+
+ return p.Publish(key, value)
+}
+
func (p *TopicPublisher) FinishPublish() error {
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, inputBuffer := range inputBuffers {
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 4dfce4030..04c2ebe08 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -17,6 +18,7 @@ type PublisherConfiguration struct {
PartitionCount int32
Brokers []string
PublisherName string // for debugging
+ RecordType *schema_pb.RecordType
}
type PublishClient struct {
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index a8d7079c9..c32343e0d 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -236,6 +236,7 @@ func (p *TopicPublisher) doConfigureTopic() (err error) {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
Topic: p.config.Topic.ToPbTopic(),
PartitionCount: p.config.PartitionCount,
+ RecordType: p.config.RecordType, // TODO schema upgrade
})
return err
})