diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index cd072503c..3521a0df2 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -12,7 +12,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc/peer" + "google.golang.org/protobuf/proto" ) // PUB @@ -140,6 +142,16 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis continue } + // Basic validation: ensure message can be unmarshaled as RecordValue + if dataMessage.Value != nil { + record := &schema_pb.RecordValue{} + if err := proto.Unmarshal(dataMessage.Value, record); err == nil { + } else { + // If unmarshaling fails, we skip validation but log a warning + glog.V(1).Infof("Could not unmarshal RecordValue for validation on topic %v partition %v: %v", initMessage.Topic, initMessage.Partition, err) + } + } + // The control message should still be sent to the follower // to avoid timing issue when ack messages. @@ -171,3 +183,4 @@ func findClientAddress(ctx context.Context) string { } return pr.Addr.String() } + |
