diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/README.md | 73 | ||||
| -rw-r--r-- | weed/mq/broker.go | 12 | ||||
| -rw-r--r-- | weed/mq/broker/brokder_grpc_admin.go | 207 | ||||
| -rw-r--r-- | weed/mq/broker/brokder_grpc_pub.go | 16 | ||||
| -rw-r--r-- | weed/mq/broker/broker_segment_serde.go | 89 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 109 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde.go | 109 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde_test.go | 56 | ||||
| -rw-r--r-- | weed/mq/segment/segment_serde.go | 1 | ||||
| -rw-r--r-- | weed/mq/topic.go | 62 |
10 files changed, 734 insertions, 0 deletions
diff --git a/weed/mq/README.md b/weed/mq/README.md new file mode 100644 index 000000000..6a641c009 --- /dev/null +++ b/weed/mq/README.md @@ -0,0 +1,73 @@ +# SeaweedMQ Message Queue on SeaweedFS (WIP, not ready) + +## What are the use cases it is designed for? + +Message queues are like water pipes. Messages flow in the pipes to their destinations. + +However, what if a flood comes? Of course, you can increase the number of partitions, add more brokers, restart, +and watch the traffic level closely. + +Sometimes the flood is expected. For example, backfill some old data in batch, and switch to online messages. +You may want to ensure enough brokers to handle the data and reduce them later to cut cost. + +SeaweedMQ is designed for use cases that need to: +* Receive and save large number of messages. +* Handle spike traffic automatically. + +## What is special about SeaweedMQ? + +* Separate computation and storage nodes to scale independently. + * Unlimited storage space by adding volume servers. + * Unlimited message brokers to handle incoming messages. + * Offline messages can be operated as normal files. +* Scale up and down with auto split and merge message topics. + * Topics can automatically split into segments when traffic increases, and vice verse. +* Pass messages by reference instead of copying. + * Clients can optionally upload the messages first and just submit the references. + * Drastically reduce the broker load. +* Stateless brokers + * All brokers are equal. One broker is dynamically picked as the leader. + * Add brokers at any time. + * Allow rolling restart brokers or remove brokers at a pace. + +# Design + +# How it works? + +Brokers are just computation nodes without storage. When a broker starts, it reports itself to masters. +Among all the brokers, one of them will be selected as the leader by the masters. + +A topic needs to define its partition key on its messages. + +Messages for a topic are divided into segments. One segment can cover a range of partitions. A segment can +be split into 2 segments, or 2 neighboring segments can be merged back to one segment. + +During write time, the client will ask the broker leader for a few brokers to process the segment. + +The broker leader will check whether the segment already has assigned the brokers. If not, select a few brokers based +on their loads, save the selection into filer, and tell the client. + +The client will write the messages for this segment to the selected brokers. + +## Failover + +The broker leader does not contain any state. If it fails, the masters will select a different broker. + +For a segment, if any one of the selected brokers is down, the remaining brokers should try to write received messages +to the filer, and close the segment to the clients. + +Then the clients should start a new segment. The masters should assign other healthy brokers to handle the new segment. + +So any brokers can go down without losing data. + +## Auto Split or Merge + +(The idea is learned from Pravega.) + +The brokers should report its traffic load to the broker leader periodically. + +If any segment has too much load, the broker leader will ask the brokers to tell the client to +close current one and create two new segments. + +If 2 neighboring segments have the combined load below average load per segment, the broker leader will ask +the brokers to tell the client to close this 2 segments and create a new segment. diff --git a/weed/mq/broker.go b/weed/mq/broker.go new file mode 100644 index 000000000..8debcec0b --- /dev/null +++ b/weed/mq/broker.go @@ -0,0 +1,12 @@ +package mq + +const LAST_MINUTES = 10 + +type TopicStat struct { + MessageCounts [LAST_MINUTES]int64 + ByteCounts [LAST_MINUTES]int64 +} + +func NewTopicStat() *TopicStat { + return &TopicStat{} +} diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go new file mode 100644 index 000000000..cbd17b3b9 --- /dev/null +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -0,0 +1,207 @@ +package broker + +import ( + "context" + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sort" + "sync" +) + +func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { + ret := &mq_pb.FindBrokerLeaderResponse{} + err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.BrokerType, + FilerGroup: request.FilerGroup, + IsLeaderOnly: true, + }) + if err != nil { + return err + } + if len(resp.ClusterNodes) == 0 { + return nil + } + ret.Broker = resp.ClusterNodes[0].Address + return nil + }) + return ret, err +} + +func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) { + ret := &mq_pb.CheckSegmentStatusResponse{} + // TODO add in memory active segment + return ret, nil +} + +func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) { + ret := &mq_pb.CheckBrokerLoadResponse{} + // TODO read broker's load + return ret, nil +} + +func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) { + ret := &mq_pb.AssignSegmentBrokersResponse{} + segment := mq.FromPbSegment(request.Segment) + + // check existing segment locations on filer + existingBrokers, err := broker.checkSegmentOnFiler(segment) + if err != nil { + return ret, err + } + + if len(existingBrokers) > 0 { + // good if the segment is still on the brokers + isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) + if err != nil { + return ret, err + } + if isActive { + for _, broker := range existingBrokers { + ret.Brokers = append(ret.Brokers, string(broker)) + } + return ret, nil + } + } + + // randomly pick up to 10 brokers, and find the ones with the lightest load + selectedBrokers, err := broker.selectBrokers() + if err != nil { + return ret, err + } + + // save the allocated brokers info for this segment on the filer + if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil { + return ret, err + } + + for _, broker := range selectedBrokers { + ret.Brokers = append(ret.Brokers, string(broker)) + } + return ret, nil +} + +func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) { + var wg sync.WaitGroup + + for _, candidate := range brokers { + wg.Add(1) + go func(candidate pb.ServerAddress) { + defer wg.Done() + broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error { + resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{ + Segment: &mq_pb.Segment{ + Namespace: string(segment.Topic.Namespace), + Topic: segment.Topic.Name, + Id: segment.Id, + }, + }) + if checkErr != nil { + err = checkErr + glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr) + return nil + } + if resp.IsActive == false { + active = false + } + return nil + }) + }(candidate) + } + wg.Wait() + return +} + +func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) { + candidates, err := broker.selectCandidatesFromMaster(10) + if err != nil { + return + } + brokers, err = broker.pickLightestCandidates(candidates, 3) + return +} + +func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) { + err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.BrokerType, + FilerGroup: broker.option.FilerGroup, + Limit: limit, + }) + if err != nil { + return err + } + if len(resp.ClusterNodes) == 0 { + return nil + } + for _, node := range resp.ClusterNodes { + candidates = append(candidates, pb.ServerAddress(node.Address)) + } + return nil + }) + return +} + +type CandidateStatus struct { + address pb.ServerAddress + messageCount int64 + bytesCount int64 + load int64 +} + +func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) { + + if len(candidates) <= limit { + return candidates, nil + } + + candidateStatuses, err := broker.checkBrokerStatus(candidates) + if err != nil { + return nil, err + } + + sort.Slice(candidateStatuses, func(i, j int) bool { + return candidateStatuses[i].load < candidateStatuses[j].load + }) + + for i, candidate := range candidateStatuses { + if i >= limit { + break + } + selected = append(selected, candidate.address) + } + + return +} + +func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) { + + candidateStatuses = make([]*CandidateStatus, len(candidates)) + var wg sync.WaitGroup + for i, candidate := range candidates { + wg.Add(1) + go func(i int, candidate pb.ServerAddress) { + defer wg.Done() + err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error { + resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{}) + if checkErr != nil { + err = checkErr + return err + } + candidateStatuses[i] = &CandidateStatus{ + address: candidate, + messageCount: resp.MessageCount, + bytesCount: resp.BytesCount, + load: resp.MessageCount + resp.BytesCount/(64*1024), + } + return nil + }) + }(i, candidate) + } + wg.Wait() + return +} diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go new file mode 100644 index 000000000..a26be5171 --- /dev/null +++ b/weed/mq/broker/brokder_grpc_pub.go @@ -0,0 +1,16 @@ +package broker + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +/* +The messages is buffered in memory, and saved to filer under + /topics/<topic>/<date>/<hour>/<segment>/*.msg + /topics/<topic>/<date>/<hour>/segment + /topics/<topic>/info/segment_<id>.meta +*/ + +func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { + return nil +} diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go new file mode 100644 index 000000000..c0741071f --- /dev/null +++ b/weed/mq/broker/broker_segment_serde.go @@ -0,0 +1,89 @@ +package broker + +import ( + "bytes" + "fmt" + "github.com/golang/protobuf/jsonpb" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "time" +) + +func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { + info, found, err := broker.readSegmentOnFiler(segment) + if err != nil { + return + } + if !found { + return + } + for _, b := range info.Brokers { + brokers = append(brokers, pb.ServerAddress(b)) + } + + return +} + +func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { + var nodes []string + for _, b := range brokers { + nodes = append(nodes, string(b)) + } + broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{ + Segment: segment.ToPbSegment(), + StartTsNs: time.Now().UnixNano(), + Brokers: nodes, + StopTsNs: 0, + PreviousSegments: nil, + NextSegments: nil, + }) + return +} + +func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) { + dir, name := segment.DirAndName() + + found, err = filer_pb.Exists(broker, dir, name, false) + if !found || err != nil { + return + } + + err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // read filer conf first + data, err := filer.ReadInsideFiler(client, dir, name) + if err != nil { + return fmt.Errorf("ReadEntry: %v", err) + } + + // parse into filer conf object + info = &mq_pb.SegmentInfo{} + if err = jsonpb.Unmarshal(bytes.NewReader(data), info); err != nil { + return err + } + found = true + return nil + }) + + return +} + +func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) { + dir, name := segment.DirAndName() + + var buf bytes.Buffer + filer.ProtoToText(&buf, info) + + err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // read filer conf first + err := filer.SaveInsideFiler(client, dir, name, buf.Bytes()) + if err != nil { + return fmt.Errorf("save segment info: %v", err) + } + return nil + }) + + return +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go new file mode 100644 index 000000000..512667d15 --- /dev/null +++ b/weed/mq/broker/broker_server.go @@ -0,0 +1,109 @@ +package broker + +import ( + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "google.golang.org/grpc" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +type MessageQueueBrokerOption struct { + Masters map[string]pb.ServerAddress + FilerGroup string + DataCenter string + Rack string + DefaultReplication string + MaxMB int + Ip string + Port int + Cipher bool +} + +type MessageQueueBroker struct { + mq_pb.UnimplementedSeaweedMessagingServer + option *MessageQueueBrokerOption + grpcDialOption grpc.DialOption + MasterClient *wdclient.MasterClient + filers map[pb.ServerAddress]struct{} + currentFiler pb.ServerAddress +} + +func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { + + mqBroker = &MessageQueueBroker{ + option: option, + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), + filers: make(map[pb.ServerAddress]struct{}), + } + mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate + + go mqBroker.MasterClient.KeepConnectedToMaster() + + existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType) + for _, newNode := range existingNodes { + mqBroker.OnBrokerUpdate(newNode, time.Now()) + } + + return mqBroker, nil +} + +func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.FilerType { + return + } + + address := pb.ServerAddress(update.Address) + if update.IsAdd { + broker.filers[address] = struct{}{} + if broker.currentFiler == "" { + broker.currentFiler = address + } + } else { + delete(broker.filers, address) + if broker.currentFiler == address { + for filer, _ := range broker.filers { + broker.currentFiler = filer + break + } + } + } + +} + +func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress { + return broker.currentFiler +} + +func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithFilerClient(streamingMode, broker.GetFiler(), broker.grpcDialOption, fn) + +} + +func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string { + + return location.Url + +} + +func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { + + return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return fn(client) + }) + +} + +func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error { + + return pb.WithBrokerClient(streamingMode, server, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + return fn(client) + }) + +} diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go new file mode 100644 index 000000000..66a76c57d --- /dev/null +++ b/weed/mq/segment/message_serde.go @@ -0,0 +1,109 @@ +package segment + +import ( + flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/pb/message_fbs" +) + +type MessageBatchBuilder struct { + b *flatbuffers.Builder + producerId int32 + producerEpoch int32 + segmentId int32 + flags int32 + messageOffsets []flatbuffers.UOffsetT + segmentSeqBase int64 + segmentSeqLast int64 + tsMsBase int64 + tsMsLast int64 +} + +func NewMessageBatchBuilder(b *flatbuffers.Builder, + producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) *MessageBatchBuilder { + + b.Reset() + + return &MessageBatchBuilder{ + b: b, + producerId: producerId, + producerEpoch: producerEpoch, + segmentId: segmentId, + flags: flags, + } +} + +func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string][]byte, key []byte, value []byte) { + if builder.segmentSeqBase == 0 { + builder.segmentSeqBase = segmentSeq + } + builder.segmentSeqLast = segmentSeq + if builder.tsMsBase == 0 { + builder.tsMsBase = tsMs + } + builder.tsMsLast = tsMs + + var names, values, pairs []flatbuffers.UOffsetT + for k, v := range properties { + names = append(names, builder.b.CreateString(k)) + values = append(values, builder.b.CreateByteVector(v)) + } + for i, _ := range names { + message_fbs.NameValueStart(builder.b) + message_fbs.NameValueAddName(builder.b, names[i]) + message_fbs.NameValueAddValue(builder.b, values[i]) + pair := message_fbs.NameValueEnd(builder.b) + pairs = append(pairs, pair) + } + + message_fbs.MessageStartPropertiesVector(builder.b, len(properties)) + for i := len(pairs) - 1; i >= 0; i-- { + builder.b.PrependUOffsetT(pairs[i]) + } + propOffset := builder.b.EndVector(len(properties)) + + keyOffset := builder.b.CreateByteVector(key) + valueOffset := builder.b.CreateByteVector(value) + + message_fbs.MessageStart(builder.b) + message_fbs.MessageAddSeqDelta(builder.b, int32(segmentSeq-builder.segmentSeqBase)) + message_fbs.MessageAddTsMsDelta(builder.b, int32(tsMs-builder.tsMsBase)) + + message_fbs.MessageAddProperties(builder.b, propOffset) + message_fbs.MessageAddKey(builder.b, keyOffset) + message_fbs.MessageAddData(builder.b, valueOffset) + messageOffset := message_fbs.MessageEnd(builder.b) + + builder.messageOffsets = append(builder.messageOffsets, messageOffset) + +} + +func (builder *MessageBatchBuilder) BuildMessageBatch() { + message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets)) + for i := len(builder.messageOffsets) - 1; i >= 0; i-- { + builder.b.PrependUOffsetT(builder.messageOffsets[i]) + } + messagesOffset := builder.b.EndVector(len(builder.messageOffsets)) + + message_fbs.MessageBatchStart(builder.b) + message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId) + message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch) + message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId) + message_fbs.MessageBatchAddFlags(builder.b, builder.flags) + message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase) + message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase)) + message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase) + message_fbs.MessageBatchAddTsMsMaxDelta(builder.b, int32(builder.tsMsLast-builder.tsMsBase)) + + message_fbs.MessageBatchAddMessages(builder.b, messagesOffset) + + messageBatch := message_fbs.MessageBatchEnd(builder.b) + + builder.b.Finish(messageBatch) +} + +func (builder *MessageBatchBuilder) GetBytes() []byte { + return builder.b.FinishedBytes() +} diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go new file mode 100644 index 000000000..cb7ad1501 --- /dev/null +++ b/weed/mq/segment/message_serde_test.go @@ -0,0 +1,56 @@ +package segment + +import ( + flatbuffers "github.com/google/flatbuffers/go" + "github.com/seaweedfs/seaweedfs/weed/pb/message_fbs" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMessageSerde(t *testing.T) { + b := flatbuffers.NewBuilder(1024) + + prop := make(map[string][]byte) + prop["n1"] = []byte("v1") + prop["n2"] = []byte("v2") + + bb := NewMessageBatchBuilder(b, 1, 2, 3, 4) + + bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here")) + + bb.BuildMessageBatch() + + buf := bb.GetBytes() + + println("serialized size", len(buf)) + + mb := message_fbs.GetRootAsMessageBatch(buf, 0) + + assert.Equal(t, int32(1), mb.ProducerId()) + assert.Equal(t, int32(2), mb.ProducerEpoch()) + assert.Equal(t, int32(3), mb.SegmentId()) + assert.Equal(t, int32(4), mb.Flags()) + assert.Equal(t, int64(5), mb.SegmentSeqBase()) + assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta()) + assert.Equal(t, int64(6), mb.TsMsBase()) + assert.Equal(t, int32(0), mb.TsMsMaxDelta()) + + assert.Equal(t, 1, mb.MessagesLength()) + + m := &message_fbs.Message{} + mb.Messages(m, 0) + + nv := &message_fbs.NameValue{} + m.Properties(nv, 0) + assert.Equal(t, "n1", string(nv.Name())) + assert.Equal(t, "v1", string(nv.Value())) + m.Properties(nv, 1) + assert.Equal(t, "n2", string(nv.Name())) + assert.Equal(t, "v2", string(nv.Value())) + assert.Equal(t, []byte("the primary key"), m.Key()) + assert.Equal(t, []byte("body is here"), m.Data()) + + assert.Equal(t, int32(0), m.SeqDelta()) + assert.Equal(t, int32(0), m.TsMsDelta()) + +} diff --git a/weed/mq/segment/segment_serde.go b/weed/mq/segment/segment_serde.go new file mode 100644 index 000000000..e076271d6 --- /dev/null +++ b/weed/mq/segment/segment_serde.go @@ -0,0 +1 @@ +package segment diff --git a/weed/mq/topic.go b/weed/mq/topic.go new file mode 100644 index 000000000..96544bac9 --- /dev/null +++ b/weed/mq/topic.go @@ -0,0 +1,62 @@ +package mq + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "time" +) + +type Namespace string + +type Topic struct { + Namespace Namespace + Name string +} + +type Partition struct { + RangeStart int32 + RangeStop int32 // exclusive + RingSize int32 +} + +type Segment struct { + Topic Topic + Id int32 + Partition Partition + LastModified time.Time +} + +func FromPbSegment(segment *mq_pb.Segment) *Segment { + return &Segment{ + Topic: Topic{ + Namespace: Namespace(segment.Namespace), + Name: segment.Topic, + }, + Id: segment.Id, + Partition: Partition{ + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + RingSize: segment.Partition.RingSize, + }, + } +} + +func (segment *Segment) ToPbSegment() *mq_pb.Segment { + return &mq_pb.Segment{ + Namespace: string(segment.Topic.Namespace), + Topic: segment.Topic.Name, + Id: segment.Id, + Partition: &mq_pb.Partition{ + RingSize: segment.Partition.RingSize, + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + }, + } +} + +func (segment *Segment) DirAndName() (dir string, name string) { + dir = fmt.Sprintf("%s/%s/%s", filer.TopicsDir, segment.Topic.Namespace, segment.Topic.Name) + name = fmt.Sprintf("%4d.segment", segment.Id) + return +} |
