diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-22 01:12:32 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-28 23:24:38 -0700 |
| commit | 06cd491abc82a6b799c47ae600a4bb81d24091ce (patch) | |
| tree | ce2fe18c31c54513413535c1c491cea1eca16afd /weed/pb | |
| parent | 7576c244c47113b452681967b741ece5225767e3 (diff) | |
| download | seaweedfs-06cd491abc82a6b799c47ae600a4bb81d24091ce.tar.xz seaweedfs-06cd491abc82a6b799c47ae600a4bb81d24091ce.zip | |
add message batch
Diffstat (limited to 'weed/pb')
| -rw-r--r-- | weed/pb/message.fbs | 26 | ||||
| -rw-r--r-- | weed/pb/message_fbs/Message.go | 92 | ||||
| -rw-r--r-- | weed/pb/message_fbs/MessageBatch.go | 187 |
3 files changed, 220 insertions, 85 deletions
diff --git a/weed/pb/message.fbs b/weed/pb/message.fbs index 8ee6c5b55..170551df7 100644 --- a/weed/pb/message.fbs +++ b/weed/pb/message.fbs @@ -3,13 +3,21 @@ table NameValue { value:string; } table Message { - producer_id:int32 (id:0); - producer_seq:int64 (id:2); - segment_id:int32 (id:1); - segment_seq:int64 (id:3); - event_ts_ns:int64 (id:4); - recv_ts_ns:int64 (id:5); - properties:[NameValue] (id:6); - key:string (id:7); // bytes - data:string (id:8); // bytes + seq_delta:int32 (id:0); + ts_ms_delta:int32 (id:1); + properties:[NameValue] (id:2); + key:string (id:3); // bytes + data:string (id:4); // bytes +} + +table MessageBatch { + producer_id:int32 (id:0); + producer_epoch:int32 (id:1); + segment_id:int32 (id:2); + flags: int32 (id:3); + segment_seq_base:int64 (id:4); + segment_seq_max_delta:int32 (id:5); + ts_ms_base:int64 (id:6); + ts_ms_max_delta:int32 (id:7); + messages: [Message] (id:8); } diff --git a/weed/pb/message_fbs/Message.go b/weed/pb/message_fbs/Message.go index 058abfeaa..e9ef83616 100644 --- a/weed/pb/message_fbs/Message.go +++ b/weed/pb/message_fbs/Message.go @@ -33,7 +33,7 @@ func (rcv *Message) Table() flatbuffers.Table { return rcv._tab } -func (rcv *Message) ProducerId() int32 { +func (rcv *Message) SeqDelta() int32 { o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) if o != 0 { return rcv._tab.GetInt32(o + rcv._tab.Pos) @@ -41,11 +41,11 @@ func (rcv *Message) ProducerId() int32 { return 0 } -func (rcv *Message) MutateProducerId(n int32) bool { +func (rcv *Message) MutateSeqDelta(n int32) bool { return rcv._tab.MutateInt32Slot(4, n) } -func (rcv *Message) SegmentId() int32 { +func (rcv *Message) TsMsDelta() int32 { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) if o != 0 { return rcv._tab.GetInt32(o + rcv._tab.Pos) @@ -53,60 +53,12 @@ func (rcv *Message) SegmentId() int32 { return 0 } -func (rcv *Message) MutateSegmentId(n int32) bool { +func (rcv *Message) MutateTsMsDelta(n int32) bool { return rcv._tab.MutateInt32Slot(6, n) } -func (rcv *Message) ProducerSeq() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateProducerSeq(n int64) bool { - return rcv._tab.MutateInt64Slot(8, n) -} - -func (rcv *Message) SegmentSeq() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateSegmentSeq(n int64) bool { - return rcv._tab.MutateInt64Slot(10, n) -} - -func (rcv *Message) EventTsNs() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateEventTsNs(n int64) bool { - return rcv._tab.MutateInt64Slot(12, n) -} - -func (rcv *Message) RecvTsNs() int64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) - if o != 0 { - return rcv._tab.GetInt64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *Message) MutateRecvTsNs(n int64) bool { - return rcv._tab.MutateInt64Slot(14, n) -} - func (rcv *Message) Properties(obj *NameValue, j int) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { x := rcv._tab.Vector(o) x += flatbuffers.UOffsetT(j) * 4 @@ -118,7 +70,7 @@ func (rcv *Message) Properties(obj *NameValue, j int) bool { } func (rcv *Message) PropertiesLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { return rcv._tab.VectorLen(o) } @@ -126,7 +78,7 @@ func (rcv *Message) PropertiesLength() int { } func (rcv *Message) Key() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -134,7 +86,7 @@ func (rcv *Message) Key() []byte { } func (rcv *Message) Data() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -142,37 +94,25 @@ func (rcv *Message) Data() []byte { } func MessageStart(builder *flatbuffers.Builder) { - builder.StartObject(9) -} -func MessageAddProducerId(builder *flatbuffers.Builder, producerId int32) { - builder.PrependInt32Slot(0, producerId, 0) -} -func MessageAddSegmentId(builder *flatbuffers.Builder, segmentId int32) { - builder.PrependInt32Slot(1, segmentId, 0) -} -func MessageAddProducerSeq(builder *flatbuffers.Builder, producerSeq int64) { - builder.PrependInt64Slot(2, producerSeq, 0) -} -func MessageAddSegmentSeq(builder *flatbuffers.Builder, segmentSeq int64) { - builder.PrependInt64Slot(3, segmentSeq, 0) + builder.StartObject(5) } -func MessageAddEventTsNs(builder *flatbuffers.Builder, eventTsNs int64) { - builder.PrependInt64Slot(4, eventTsNs, 0) +func MessageAddSeqDelta(builder *flatbuffers.Builder, seqDelta int32) { + builder.PrependInt32Slot(0, seqDelta, 0) } -func MessageAddRecvTsNs(builder *flatbuffers.Builder, recvTsNs int64) { - builder.PrependInt64Slot(5, recvTsNs, 0) +func MessageAddTsMsDelta(builder *flatbuffers.Builder, tsMsDelta int32) { + builder.PrependInt32Slot(1, tsMsDelta, 0) } func MessageAddProperties(builder *flatbuffers.Builder, properties flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(properties), 0) + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(properties), 0) } func MessageStartPropertiesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) } func MessageAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(7, flatbuffers.UOffsetT(key), 0) + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(key), 0) } func MessageAddData(builder *flatbuffers.Builder, data flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(8, flatbuffers.UOffsetT(data), 0) + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(data), 0) } func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/weed/pb/message_fbs/MessageBatch.go b/weed/pb/message_fbs/MessageBatch.go new file mode 100644 index 000000000..19d6a4816 --- /dev/null +++ b/weed/pb/message_fbs/MessageBatch.go @@ -0,0 +1,187 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message_fbs + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type MessageBatch struct { + _tab flatbuffers.Table +} + +func GetRootAsMessageBatch(buf []byte, offset flatbuffers.UOffsetT) *MessageBatch { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &MessageBatch{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessageBatch(buf []byte, offset flatbuffers.UOffsetT) *MessageBatch { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &MessageBatch{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *MessageBatch) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *MessageBatch) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *MessageBatch) ProducerId() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateProducerId(n int32) bool { + return rcv._tab.MutateInt32Slot(4, n) +} + +func (rcv *MessageBatch) ProducerEpoch() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateProducerEpoch(n int32) bool { + return rcv._tab.MutateInt32Slot(6, n) +} + +func (rcv *MessageBatch) SegmentId() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateSegmentId(n int32) bool { + return rcv._tab.MutateInt32Slot(8, n) +} + +func (rcv *MessageBatch) Flags() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateFlags(n int32) bool { + return rcv._tab.MutateInt32Slot(10, n) +} + +func (rcv *MessageBatch) SegmentSeqBase() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateSegmentSeqBase(n int64) bool { + return rcv._tab.MutateInt64Slot(12, n) +} + +func (rcv *MessageBatch) SegmentSeqMaxDelta() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateSegmentSeqMaxDelta(n int32) bool { + return rcv._tab.MutateInt32Slot(14, n) +} + +func (rcv *MessageBatch) TsMsBase() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateTsMsBase(n int64) bool { + return rcv._tab.MutateInt64Slot(16, n) +} + +func (rcv *MessageBatch) TsMsMaxDelta() int32 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *MessageBatch) MutateTsMsMaxDelta(n int32) bool { + return rcv._tab.MutateInt32Slot(18, n) +} + +func (rcv *MessageBatch) Messages(obj *Message, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *MessageBatch) MessagesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(20)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func MessageBatchStart(builder *flatbuffers.Builder) { + builder.StartObject(9) +} +func MessageBatchAddProducerId(builder *flatbuffers.Builder, producerId int32) { + builder.PrependInt32Slot(0, producerId, 0) +} +func MessageBatchAddProducerEpoch(builder *flatbuffers.Builder, producerEpoch int32) { + builder.PrependInt32Slot(1, producerEpoch, 0) +} +func MessageBatchAddSegmentId(builder *flatbuffers.Builder, segmentId int32) { + builder.PrependInt32Slot(2, segmentId, 0) +} +func MessageBatchAddFlags(builder *flatbuffers.Builder, flags int32) { + builder.PrependInt32Slot(3, flags, 0) +} +func MessageBatchAddSegmentSeqBase(builder *flatbuffers.Builder, segmentSeqBase int64) { + builder.PrependInt64Slot(4, segmentSeqBase, 0) +} +func MessageBatchAddSegmentSeqMaxDelta(builder *flatbuffers.Builder, segmentSeqMaxDelta int32) { + builder.PrependInt32Slot(5, segmentSeqMaxDelta, 0) +} +func MessageBatchAddTsMsBase(builder *flatbuffers.Builder, tsMsBase int64) { + builder.PrependInt64Slot(6, tsMsBase, 0) +} +func MessageBatchAddTsMsMaxDelta(builder *flatbuffers.Builder, tsMsMaxDelta int32) { + builder.PrependInt32Slot(7, tsMsMaxDelta, 0) +} +func MessageBatchAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(8, flatbuffers.UOffsetT(messages), 0) +} +func MessageBatchStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessageBatchEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} |
