diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/README.md | 66 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde.go | 48 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde_test.go | 47 | ||||
| -rw-r--r-- | weed/mq/segment/segment_serde.go | 1 |
4 files changed, 162 insertions, 0 deletions
diff --git a/weed/mq/README.md b/weed/mq/README.md new file mode 100644 index 000000000..2f1127869 --- /dev/null +++ b/weed/mq/README.md @@ -0,0 +1,66 @@ +# SeaweedMQ Message Queue on SeaweedFS (WIP, not ready) + +## What are the use cases it is designed for? + +Message queues are like water pipes. Messages flow in the pipes to their destinations. + +However, what if a flood comes? Of course, you can increase the number of partitions, add more brokers, restart, +and watch the traffic level closely. + +Sometimes the flood is expected. For example, backfill some old data in batch, and switch to online messages. +You may want to ensure enough brokers to handle the data and reduce them later to cut cost. + +SeaweedMQ is designed for use cases that need to: +* Receive and save large number of messages. +* Handle spike traffic automatically. + +## What is special about SeaweedMQ? + +* Separate computation and storage nodes that scales independently. + * Offline messages can be operated as normal files. + * Unlimited storage space by adding volume servers. + * Unlimited message brokers. +* Scale up and down with auto split and merge message topics. + * Topics can automatically split into segments when traffic increases, and vice verse. + * + +# Design + +# How it works? + +Brokers are just computation nodes without storage. When a broker starts, it reports itself to masters. +Among all the brokers, one of them will be selected as the leader by the masters. + +A topic needs to define its partition key on its messages. + +Messages for a topic are divided into segments. + +During write time, the client will ask the broker leader for a few brokers to process the segment. + +The broker leader will check whether the segment already has assigned the brokers. If not, select a few based +on their loads, save the selection into filer, and tell the client. + +The client will write the messages for this segment to the selected brokers. + +## Failover + +The broker leader does not contain any state. If it fails, the masters will select a different broker. + +For a segment, if any one of the selected brokers is down, the remaining brokers should try to write received messages +to the filer, and close the segment to the clients. + +Then the clients should start a new segment. The masters should other healthy brokers to handle the new segment. + +So any brokers can go down without losing data. + +## Auto Split or Merge + +(The idea is learned from Pravega.) + +The brokers should report its traffic load to the broker leader periodically. + +If any segment has too much load, the broker leader will ask the brokers to tell the client to +close current one and create two new segments. + +If 2 neighboring segments have the combined load below average load per segment, the broker leader will ask +the brokers to tell the client to close this 2 segments and create a new segment. diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go new file mode 100644 index 000000000..b69d78cab --- /dev/null +++ b/weed/mq/segment/message_serde.go @@ -0,0 +1,48 @@ +package segment + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/message_fbs" + flatbuffers "github.com/google/flatbuffers/go" +) + +func CreateMessage(b *flatbuffers.Builder, producerId int32, producerSeq int64, segmentId int32, segmentSeq int64, + eventTsNs int64, recvTsNs int64, properties map[string]string, key []byte, value []byte) { + b.Reset() + + var names, values, pairs []flatbuffers.UOffsetT + for k, v := range properties { + names = append(names, b.CreateString(k)) + values = append(values, b.CreateString(v)) + } + + for i, _ := range names { + message_fbs.NameValueStart(b) + message_fbs.NameValueAddName(b, names[i]) + message_fbs.NameValueAddValue(b, values[i]) + pair := message_fbs.NameValueEnd(b) + pairs = append(pairs, pair) + } + message_fbs.MessageStartPropertiesVector(b, len(properties)) + for i := len(pairs) - 1; i >= 0; i-- { + b.PrependUOffsetT(pairs[i]) + } + prop := b.EndVector(len(properties)) + + k := b.CreateByteVector(key) + v := b.CreateByteVector(value) + + message_fbs.MessageStart(b) + message_fbs.MessageAddProducerId(b, producerId) + message_fbs.MessageAddProducerSeq(b, producerSeq) + message_fbs.MessageAddSegmentId(b, segmentId) + message_fbs.MessageAddSegmentSeq(b, segmentSeq) + message_fbs.MessageAddEventTsNs(b, eventTsNs) + message_fbs.MessageAddRecvTsNs(b, recvTsNs) + + message_fbs.MessageAddProperties(b, prop) + message_fbs.MessageAddKey(b, k) + message_fbs.MessageAddData(b, v) + message := message_fbs.MessageEnd(b) + + b.Finish(message) +} diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go new file mode 100644 index 000000000..7ba0febf0 --- /dev/null +++ b/weed/mq/segment/message_serde_test.go @@ -0,0 +1,47 @@ +package segment + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/message_fbs" + flatbuffers "github.com/google/flatbuffers/go" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMessageSerde(t *testing.T) { + b := flatbuffers.NewBuilder(1024) + + prop := make(map[string]string) + prop["n1"] = "v1" + prop["n2"] = "v2" + + CreateMessage(b, 1, 2, 3, 4, 5, 6, prop, + []byte("the primary key"), []byte("body is here")) + + buf := b.FinishedBytes() + + println("serialized size", len(buf)) + + m := message_fbs.GetRootAsMessage(buf, 0) + + assert.Equal(t, int32(1), m.ProducerId()) + assert.Equal(t, int64(2), m.ProducerSeq()) + assert.Equal(t, int32(3), m.SegmentId()) + assert.Equal(t, int64(4), m.SegmentSeq()) + assert.Equal(t, int64(5), m.EventTsNs()) + assert.Equal(t, int64(6), m.RecvTsNs()) + + assert.Equal(t, 2, m.PropertiesLength()) + nv := &message_fbs.NameValue{} + m.Properties(nv, 0) + assert.Equal(t, "n1", string(nv.Name())) + assert.Equal(t, "v1", string(nv.Value())) + m.Properties(nv, 1) + assert.Equal(t, "n2", string(nv.Name())) + assert.Equal(t, "v2", string(nv.Value())) + assert.Equal(t, []byte("the primary key"), m.Key()) + assert.Equal(t, []byte("body is here"), m.Data()) + + m.MutateSegmentSeq(123) + assert.Equal(t, int64(123), m.SegmentSeq()) + +} diff --git a/weed/mq/segment/segment_serde.go b/weed/mq/segment/segment_serde.go new file mode 100644 index 000000000..e076271d6 --- /dev/null +++ b/weed/mq/segment/segment_serde.go @@ -0,0 +1 @@ +package segment |
