aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_pub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go13
1 files changed, 13 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index cd072503c..3521a0df2 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -12,7 +12,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc/peer"
+ "google.golang.org/protobuf/proto"
)
// PUB
@@ -140,6 +142,16 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
continue
}
+ // Basic validation: ensure message can be unmarshaled as RecordValue
+ if dataMessage.Value != nil {
+ record := &schema_pb.RecordValue{}
+ if err := proto.Unmarshal(dataMessage.Value, record); err == nil {
+ } else {
+ // If unmarshaling fails, we skip validation but log a warning
+ glog.V(1).Infof("Could not unmarshal RecordValue for validation on topic %v partition %v: %v", initMessage.Topic, initMessage.Partition, err)
+ }
+ }
+
// The control message should still be sent to the follower
// to avoid timing issue when ack messages.
@@ -171,3 +183,4 @@ func findClientAddress(ctx context.Context) string {
}
return pr.Addr.String()
}
+