diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker.go | 12 | ||||
| -rw-r--r-- | weed/mq/broker/brokder_grpc_admin.go | 212 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 103 | ||||
| -rw-r--r-- | weed/mq/topic.go | 36 |
4 files changed, 363 insertions, 0 deletions
diff --git a/weed/mq/broker.go b/weed/mq/broker.go new file mode 100644 index 000000000..8debcec0b --- /dev/null +++ b/weed/mq/broker.go @@ -0,0 +1,12 @@ +package mq + +const LAST_MINUTES = 10 + +type TopicStat struct { + MessageCounts [LAST_MINUTES]int64 + ByteCounts [LAST_MINUTES]int64 +} + +func NewTopicStat() *TopicStat { + return &TopicStat{} +} diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go new file mode 100644 index 000000000..31b5bb84e --- /dev/null +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -0,0 +1,212 @@ +package broker + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mq" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "sort" + "sync" +) + +func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { + ret := &mq_pb.FindBrokerLeaderResponse{} + err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.BrokerType, + FilerGroup: request.FilerGroup, + IsLeaderOnly: true, + }) + if err != nil { + return err + } + if len(resp.ClusterNodes) == 0 { + return nil + } + ret.Broker = resp.ClusterNodes[0].Address + return nil + }) + return ret, err +} + +func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) { + ret := &mq_pb.CheckSegmentStatusResponse{} + // TODO add in memory active segment + return ret, nil +} + +func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) { + ret := &mq_pb.CheckBrokerLoadResponse{} + // TODO read broker's load + return ret, nil +} + +func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) { + ret := &mq_pb.AssignSegmentBrokersResponse{} + segment := mq.FromPbSegment(request.Segment) + + // check existing segment locations on filer + existingBrokers, err := broker.checkSegmentOnFiler(segment) + if err != nil { + return ret, err + } + // good if the segment is still on the brokers + isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) + if err != nil { + return ret, err + } + if isActive { + for _, broker := range existingBrokers { + ret.Brokers = append(ret.Brokers, string(broker)) + } + return ret, nil + } + + // randomly pick up to 10 brokers, and find the ones with the lightest load + selectedBrokers, err := broker.selectBrokers() + if err != nil { + return ret, err + } + + // save the allocated brokers info for this segment on the filer + if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil { + return ret, err + } + + for _, broker := range selectedBrokers { + ret.Brokers = append(ret.Brokers, string(broker)) + } + return ret, nil +} + +func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { + return +} + +func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) { + var wg sync.WaitGroup + + for _, candidate := range brokers { + wg.Add(1) + go func(candidate pb.ServerAddress) { + defer wg.Done() + broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error { + resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{ + Segment: &mq_pb.Segment{ + Namespace: string(segment.Topic.Namespace), + Topic: segment.Topic.Name, + Id: segment.Id, + }, + }) + if checkErr != nil { + err = checkErr + glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr) + return nil + } + if resp.IsActive == false { + active = false + } + return nil + }) + }(candidate) + } + wg.Wait() + return +} + +func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) { + candidates, err := broker.selectCandidatesFromMaster(10) + if err != nil { + return + } + brokers, err = broker.pickLightestCandidates(candidates, 3) + return +} + +func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) { + err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.BrokerType, + FilerGroup: broker.option.FilerGroup, + Limit: limit, + }) + if err != nil { + return err + } + if len(resp.ClusterNodes) == 0 { + return nil + } + for _, node := range resp.ClusterNodes { + candidates = append(candidates, pb.ServerAddress(node.Address)) + } + return nil + }) + return +} + +type CandidateStatus struct { + address pb.ServerAddress + messageCount int64 + bytesCount int64 + load int64 +} + +func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) { + + if len(candidates) <= limit { + return candidates, nil + } + + candidateStatuses, err := broker.checkBrokerStatus(candidates) + if err != nil { + return nil, err + } + + sort.Slice(candidateStatuses, func(i, j int) bool { + return candidateStatuses[i].load < candidateStatuses[j].load + }) + + for i, candidate := range candidateStatuses { + if i >= limit { + break + } + selected = append(selected, candidate.address) + } + + return +} + +func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) { + + candidateStatuses = make([]*CandidateStatus, len(candidates)) + var wg sync.WaitGroup + for i, candidate := range candidates { + wg.Add(1) + go func(i int, candidate pb.ServerAddress) { + defer wg.Done() + err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error { + resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{}) + if checkErr != nil { + err = checkErr + return err + } + candidateStatuses[i] = &CandidateStatus{ + address: candidate, + messageCount: resp.MessageCount, + bytesCount: resp.BytesCount, + load: resp.MessageCount + resp.BytesCount/(64*1024), + } + return nil + }) + }(i, candidate) + } + wg.Wait() + return +} + +func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { + return +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go new file mode 100644 index 000000000..f940b00c3 --- /dev/null +++ b/weed/mq/broker/broker_server.go @@ -0,0 +1,103 @@ +package broker + +import ( + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "google.golang.org/grpc" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) + +type MessageQueueBrokerOption struct { + Masters map[string]pb.ServerAddress + FilerGroup string + DataCenter string + Rack string + DefaultReplication string + MaxMB int + Ip string + Port int + Cipher bool +} + +type MessageQueueBroker struct { + mq_pb.UnimplementedSeaweedMessagingServer + option *MessageQueueBrokerOption + grpcDialOption grpc.DialOption + MasterClient *wdclient.MasterClient + filers map[pb.ServerAddress]struct{} + currentFiler pb.ServerAddress +} + +func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { + + mqBroker = &MessageQueueBroker{ + option: option, + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), + filers: make(map[pb.ServerAddress]struct{}), + } + mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate + + go mqBroker.MasterClient.KeepConnectedToMaster() + + existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType) + for _, newNode := range existingNodes { + mqBroker.OnBrokerUpdate(newNode, time.Now()) + } + + return mqBroker, nil +} + +func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.FilerType { + return + } + + address := pb.ServerAddress(update.Address) + if update.IsAdd { + broker.filers[address] = struct{}{} + if broker.currentFiler == "" { + broker.currentFiler = address + } + } else { + delete(broker.filers, address) + if broker.currentFiler == address { + for filer, _ := range broker.filers { + broker.currentFiler = filer + break + } + } + } + +} + +func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress { + return broker.currentFiler +} + +func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) + +} + +func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { + + return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return fn(client) + }) + +} + +func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error { + + return pb.WithBrokerClient(streamingMode, server, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + return fn(client) + }) + +} diff --git a/weed/mq/topic.go b/weed/mq/topic.go new file mode 100644 index 000000000..87621fca7 --- /dev/null +++ b/weed/mq/topic.go @@ -0,0 +1,36 @@ +package mq + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "time" +) + +type Namespace string + +type Topic struct { + Namespace Namespace + Name string +} + +type Partition struct { + RangeStart int + RangeStop int // exclusive + RingSize int +} + +type Segment struct { + Topic Topic + Id int32 + Partition Partition + LastModified time.Time +} + +func FromPbSegment(segment *mq_pb.Segment) *Segment { + return &Segment{ + Topic: Topic{ + Namespace: Namespace(segment.Namespace), + Name: segment.Topic, + }, + Id: segment.Id, + } +} |
