aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/mq_agent_pb
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb/mq_agent_pb')
-rw-r--r--weed/pb/mq_agent_pb/mq_agent.pb.go37
-rw-r--r--weed/pb/mq_agent_pb/publish_response_test.go102
2 files changed, 135 insertions, 4 deletions
diff --git a/weed/pb/mq_agent_pb/mq_agent.pb.go b/weed/pb/mq_agent_pb/mq_agent.pb.go
index 11f1ac551..bc321e957 100644
--- a/weed/pb/mq_agent_pb/mq_agent.pb.go
+++ b/weed/pb/mq_agent_pb/mq_agent.pb.go
@@ -296,6 +296,8 @@ type PublishRecordResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"`
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
+ BaseOffset int64 `protobuf:"varint,3,opt,name=base_offset,json=baseOffset,proto3" json:"base_offset,omitempty"` // First offset assigned to this batch
+ LastOffset int64 `protobuf:"varint,4,opt,name=last_offset,json=lastOffset,proto3" json:"last_offset,omitempty"` // Last offset assigned to this batch
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -344,6 +346,20 @@ func (x *PublishRecordResponse) GetError() string {
return ""
}
+func (x *PublishRecordResponse) GetBaseOffset() int64 {
+ if x != nil {
+ return x.BaseOffset
+ }
+ return 0
+}
+
+func (x *PublishRecordResponse) GetLastOffset() int64 {
+ if x != nil {
+ return x.LastOffset
+ }
+ return 0
+}
+
// ////////////////////////////////////////////////
type SubscribeRecordRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
@@ -413,6 +429,7 @@ type SubscribeRecordResponse struct {
Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"`
IsEndOfStream bool `protobuf:"varint,6,opt,name=is_end_of_stream,json=isEndOfStream,proto3" json:"is_end_of_stream,omitempty"`
IsEndOfTopic bool `protobuf:"varint,7,opt,name=is_end_of_topic,json=isEndOfTopic,proto3" json:"is_end_of_topic,omitempty"`
+ Offset int64 `protobuf:"varint,8,opt,name=offset,proto3" json:"offset,omitempty"` // Sequential offset within partition
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -489,6 +506,13 @@ func (x *SubscribeRecordResponse) GetIsEndOfTopic() bool {
return false
}
+func (x *SubscribeRecordResponse) GetOffset() int64 {
+ if x != nil {
+ return x.Offset
+ }
+ return 0
+}
+
type SubscribeRecordRequest_InitSubscribeRecordRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"`
@@ -621,10 +645,14 @@ const file_mq_agent_proto_rawDesc = "" +
"\n" +
"session_id\x18\x01 \x01(\x03R\tsessionId\x12\x10\n" +
"\x03key\x18\x02 \x01(\fR\x03key\x12,\n" +
- "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\"P\n" +
+ "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\"\x92\x01\n" +
"\x15PublishRecordResponse\x12!\n" +
"\fack_sequence\x18\x01 \x01(\x03R\vackSequence\x12\x14\n" +
- "\x05error\x18\x02 \x01(\tR\x05error\"\xfb\x04\n" +
+ "\x05error\x18\x02 \x01(\tR\x05error\x12\x1f\n" +
+ "\vbase_offset\x18\x03 \x01(\x03R\n" +
+ "baseOffset\x12\x1f\n" +
+ "\vlast_offset\x18\x04 \x01(\x03R\n" +
+ "lastOffset\"\xfb\x04\n" +
"\x16SubscribeRecordRequest\x12S\n" +
"\x04init\x18\x01 \x01(\v2?.messaging_pb.SubscribeRecordRequest.InitSubscribeRecordRequestR\x04init\x12!\n" +
"\fack_sequence\x18\x02 \x01(\x03R\vackSequence\x12\x17\n" +
@@ -641,14 +669,15 @@ const file_mq_agent_proto_rawDesc = "" +
"\x06filter\x18\n" +
" \x01(\tR\x06filter\x12:\n" +
"\x19max_subscribed_partitions\x18\v \x01(\x05R\x17maxSubscribedPartitions\x12.\n" +
- "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\"\xd4\x01\n" +
+ "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\"\xec\x01\n" +
"\x17SubscribeRecordResponse\x12\x10\n" +
"\x03key\x18\x02 \x01(\fR\x03key\x12,\n" +
"\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\x12\x13\n" +
"\x05ts_ns\x18\x04 \x01(\x03R\x04tsNs\x12\x14\n" +
"\x05error\x18\x05 \x01(\tR\x05error\x12'\n" +
"\x10is_end_of_stream\x18\x06 \x01(\bR\risEndOfStream\x12%\n" +
- "\x0fis_end_of_topic\x18\a \x01(\bR\fisEndOfTopic2\xb9\x03\n" +
+ "\x0fis_end_of_topic\x18\a \x01(\bR\fisEndOfTopic\x12\x16\n" +
+ "\x06offset\x18\b \x01(\x03R\x06offset2\xb9\x03\n" +
"\x15SeaweedMessagingAgent\x12l\n" +
"\x13StartPublishSession\x12(.messaging_pb.StartPublishSessionRequest\x1a).messaging_pb.StartPublishSessionResponse\"\x00\x12l\n" +
"\x13ClosePublishSession\x12(.messaging_pb.ClosePublishSessionRequest\x1a).messaging_pb.ClosePublishSessionResponse\"\x00\x12^\n" +
diff --git a/weed/pb/mq_agent_pb/publish_response_test.go b/weed/pb/mq_agent_pb/publish_response_test.go
new file mode 100644
index 000000000..1f2e767e4
--- /dev/null
+++ b/weed/pb/mq_agent_pb/publish_response_test.go
@@ -0,0 +1,102 @@
+package mq_agent_pb
+
+import (
+ "testing"
+ "google.golang.org/protobuf/proto"
+)
+
+func TestPublishRecordResponseSerialization(t *testing.T) {
+ // Test that PublishRecordResponse can serialize/deserialize with new offset fields
+ original := &PublishRecordResponse{
+ AckSequence: 123,
+ Error: "",
+ BaseOffset: 1000, // New field
+ LastOffset: 1005, // New field
+ }
+
+ // Test proto marshaling/unmarshaling
+ data, err := proto.Marshal(original)
+ if err != nil {
+ t.Fatalf("Failed to marshal PublishRecordResponse: %v", err)
+ }
+
+ restored := &PublishRecordResponse{}
+ err = proto.Unmarshal(data, restored)
+ if err != nil {
+ t.Fatalf("Failed to unmarshal PublishRecordResponse: %v", err)
+ }
+
+ // Verify all fields are preserved
+ if restored.AckSequence != original.AckSequence {
+ t.Errorf("AckSequence = %d, want %d", restored.AckSequence, original.AckSequence)
+ }
+ if restored.BaseOffset != original.BaseOffset {
+ t.Errorf("BaseOffset = %d, want %d", restored.BaseOffset, original.BaseOffset)
+ }
+ if restored.LastOffset != original.LastOffset {
+ t.Errorf("LastOffset = %d, want %d", restored.LastOffset, original.LastOffset)
+ }
+}
+
+func TestSubscribeRecordResponseSerialization(t *testing.T) {
+ // Test that SubscribeRecordResponse can serialize/deserialize with new offset field
+ original := &SubscribeRecordResponse{
+ Key: []byte("test-key"),
+ TsNs: 1234567890,
+ Error: "",
+ IsEndOfStream: false,
+ IsEndOfTopic: false,
+ Offset: 42, // New field
+ }
+
+ // Test proto marshaling/unmarshaling
+ data, err := proto.Marshal(original)
+ if err != nil {
+ t.Fatalf("Failed to marshal SubscribeRecordResponse: %v", err)
+ }
+
+ restored := &SubscribeRecordResponse{}
+ err = proto.Unmarshal(data, restored)
+ if err != nil {
+ t.Fatalf("Failed to unmarshal SubscribeRecordResponse: %v", err)
+ }
+
+ // Verify all fields are preserved
+ if restored.TsNs != original.TsNs {
+ t.Errorf("TsNs = %d, want %d", restored.TsNs, original.TsNs)
+ }
+ if restored.Offset != original.Offset {
+ t.Errorf("Offset = %d, want %d", restored.Offset, original.Offset)
+ }
+ if string(restored.Key) != string(original.Key) {
+ t.Errorf("Key = %s, want %s", string(restored.Key), string(original.Key))
+ }
+}
+
+func TestPublishRecordResponseBackwardCompatibility(t *testing.T) {
+ // Test that PublishRecordResponse without offset fields still works
+ original := &PublishRecordResponse{
+ AckSequence: 123,
+ Error: "",
+ // BaseOffset and LastOffset not set (defaults to 0)
+ }
+
+ data, err := proto.Marshal(original)
+ if err != nil {
+ t.Fatalf("Failed to marshal PublishRecordResponse: %v", err)
+ }
+
+ restored := &PublishRecordResponse{}
+ err = proto.Unmarshal(data, restored)
+ if err != nil {
+ t.Fatalf("Failed to unmarshal PublishRecordResponse: %v", err)
+ }
+
+ // Offset fields should default to 0
+ if restored.BaseOffset != 0 {
+ t.Errorf("BaseOffset = %d, want 0", restored.BaseOffset)
+ }
+ if restored.LastOffset != 0 {
+ t.Errorf("LastOffset = %d, want 0", restored.LastOffset)
+ }
+}