aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker.go12
-rw-r--r--weed/mq/broker/broker_append.go130
-rw-r--r--weed/mq/broker/broker_grpc_server.go37
-rw-r--r--weed/mq/broker/broker_grpc_server_discovery.go50
-rw-r--r--weed/mq/broker/broker_grpc_server_publish.go112
-rw-r--r--weed/mq/broker/broker_grpc_server_subscribe.go178
-rw-r--r--weed/mq/broker/broker_server.go64
-rw-r--r--weed/mq/broker/topic_manager.go124
-rw-r--r--weed/mq/msgclient/chan_config.go5
-rw-r--r--weed/mq/msgclient/chan_pub.go76
-rw-r--r--weed/mq/msgclient/chan_sub.go85
-rw-r--r--weed/mq/msgclient/client.go55
-rw-r--r--weed/mq/msgclient/config.go63
-rw-r--r--weed/mq/msgclient/publisher.go118
-rw-r--r--weed/mq/msgclient/subscriber.go120
-rw-r--r--weed/mq/topic.go23
16 files changed, 35 insertions, 1217 deletions
diff --git a/weed/mq/broker.go b/weed/mq/broker.go
new file mode 100644
index 000000000..8debcec0b
--- /dev/null
+++ b/weed/mq/broker.go
@@ -0,0 +1,12 @@
+package mq
+
+const LAST_MINUTES = 10
+
+type TopicStat struct {
+ MessageCounts [LAST_MINUTES]int64
+ ByteCounts [LAST_MINUTES]int64
+}
+
+func NewTopicStat() *TopicStat {
+ return &TopicStat{}
+}
diff --git a/weed/mq/broker/broker_append.go b/weed/mq/broker/broker_append.go
deleted file mode 100644
index c8e0da93c..000000000
--- a/weed/mq/broker/broker_append.go
+++ /dev/null
@@ -1,130 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (broker *MessageQueueBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error {
-
- assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
- if err2 != nil {
- return err2
- }
-
- dir, name := util.FullPath(targetFile).DirAndName()
-
- // append the chunk
- if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AppendToEntryRequest{
- Directory: dir,
- EntryName: name,
- Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
- }
-
- _, err := client.AppendToEntry(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("append to file %v: %v", request, err)
- return err
- }
-
- return nil
- }); err != nil {
- return fmt.Errorf("append to file %v: %v", targetFile, err)
- }
-
- return nil
-}
-
-func (broker *MessageQueueBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
-
- var assignResult = &operation.AssignResult{}
-
- // assign a volume location
- if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
-
- assignErr := util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: topicConfig.Replication,
- Collection: topicConfig.Collection,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- assignResult.Auth = security.EncodedJwt(resp.Auth)
- assignResult.Fid = resp.FileId
- assignResult.Url = resp.Location.Url
- assignResult.PublicUrl = resp.Location.PublicUrl
- assignResult.GrpcPort = int(resp.Location.GrpcPort)
- assignResult.Count = uint64(resp.Count)
-
- return nil
- })
- if assignErr != nil {
- return assignErr
- }
-
- return nil
- }); err != nil {
- return nil, nil, err
- }
-
- // upload data
- targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
- uploadOption := &operation.UploadOption{
- UploadUrl: targetUrl,
- Filename: "",
- Cipher: broker.option.Cipher,
- IsInputCompressed: false,
- MimeType: "",
- PairMap: nil,
- Jwt: assignResult.Auth,
- }
- uploadResult, err := operation.UploadData(data, uploadOption)
- if err != nil {
- return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
- }
- // println("uploaded to", targetUrl)
- return assignResult, uploadResult, nil
-}
-
-var _ = filer_pb.FilerClient(&MessageQueueBroker{})
-
-func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
-
- for _, filer := range broker.option.Filers {
- if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
- if err == io.EOF {
- return
- }
- glog.V(0).Infof("fail to connect to %s: %v", filer, err)
- } else {
- break
- }
- }
-
- return
-
-}
-
-func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
diff --git a/weed/mq/broker/broker_grpc_server.go b/weed/mq/broker/broker_grpc_server.go
deleted file mode 100644
index 2cb4187ae..000000000
--- a/weed/mq/broker/broker_grpc_server.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-func (broker *MessageQueueBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) {
- panic("implement me")
-}
-
-func (broker *MessageQueueBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) {
- resp := &mq_pb.DeleteTopicResponse{}
- dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
- if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
- return nil, err
- } else if exists {
- err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil)
- }
- return resp, nil
-}
-
-func (broker *MessageQueueBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) {
- panic("implement me")
-}
-
-func genTopicDir(namespace, topic string) string {
- return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic)
-}
-
-func genTopicDirEntry(namespace, topic string) (dir, entry string) {
- return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic
-}
diff --git a/weed/mq/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go
index e276091a9..94e89cd41 100644
--- a/weed/mq/broker/broker_grpc_server_discovery.go
+++ b/weed/mq/broker/broker_grpc_server_discovery.go
@@ -2,7 +2,6 @@ package broker
import (
"context"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb"
"time"
@@ -10,57 +9,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
-/*
-Topic discovery:
-
-When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
-
-The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
-Otherwise, just host the topic.
-
-So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
-If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
-
-*/
-
-func (broker *MessageQueueBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) {
-
- t := &mq_pb.FindBrokerResponse{}
- var peers []string
-
- targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
-
- for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
- Resource: targetTopicPartition,
- })
- if err != nil {
- return err
- }
- if resp.Found && len(resp.Resources) > 0 {
- t.Broker = resp.Resources[0].GrpcAddresses
- return nil
- }
- for _, b := range resp.Resources {
- peers = append(peers, b.GrpcAddresses)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- }
-
- t.Broker = PickMember(peers, []byte(targetTopicPartition))
-
- return t, nil
-
-}
-
func (broker *MessageQueueBroker) checkFilers() {
// contact a filer about masters
diff --git a/weed/mq/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go
deleted file mode 100644
index eb76dd5dc..000000000
--- a/weed/mq/broker/broker_grpc_server_publish.go
+++ /dev/null
@@ -1,112 +0,0 @@
-package broker
-
-import (
- "crypto/md5"
- "fmt"
- "io"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- // TODO look it up
- topicConfig := &mq_pb.TopicConfiguration{
- // IsTransient: true,
- }
-
- // send init response
- initResponse := &mq_pb.PublishResponse{
- Config: nil,
- Redirect: nil,
- }
- err = stream.Send(initResponse)
- if err != nil {
- return err
- }
- if initResponse.Redirect != nil {
- return nil
- }
-
- // get lock
- tp := TopicPartition{
- Namespace: in.Init.Namespace,
- Topic: in.Init.Topic,
- Partition: in.Init.Partition,
- }
-
- tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
- md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
- // println("chan data stored under", tpDir, "as", md5File)
-
- if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
- return fmt.Errorf("channel is already closed")
- }
-
- tl := broker.topicManager.RequestLock(tp, topicConfig, true)
- defer broker.topicManager.ReleaseLock(tp, true)
-
- md5hash := md5.New()
- // process each message
- for {
- // println("recv")
- in, err := stream.Recv()
- // glog.V(0).Infof("recieved %v err: %v", in, err)
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- if in.Data == nil {
- continue
- }
-
- // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
-
- data, err := proto.Marshal(in.Data)
- if err != nil {
- glog.Errorf("marshall error: %v\n", err)
- continue
- }
-
- tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
-
- if in.Data.IsClose {
- // println("server received closing")
- break
- }
-
- md5hash.Write(in.Data.Value)
-
- }
-
- if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
- glog.V(0).Infof("err writing %s: %v", md5File, err)
- }
-
- // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
-
- // send the close ack
- // println("server send ack closing")
- if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil {
- glog.V(0).Infof("err sending close response: %v", err)
- }
- return nil
-
-}
diff --git a/weed/mq/broker/broker_grpc_server_subscribe.go b/weed/mq/broker/broker_grpc_server_subscribe.go
deleted file mode 100644
index 3743218b1..000000000
--- a/weed/mq/broker/broker_grpc_server_subscribe.go
+++ /dev/null
@@ -1,178 +0,0 @@
-package broker
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
- "io"
- "strings"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-func (broker *MessageQueueBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- var processedTsNs int64
- var messageCount int64
- subscriberId := in.Init.SubscriberId
-
- // TODO look it up
- topicConfig := &mq_pb.TopicConfiguration{
- // IsTransient: true,
- }
-
- // get lock
- tp := TopicPartition{
- Namespace: in.Init.Namespace,
- Topic: in.Init.Topic,
- Partition: in.Init.Partition,
- }
- fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
- defer func() {
- fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
- }()
-
- lock := broker.topicManager.RequestLock(tp, topicConfig, false)
- defer broker.topicManager.ReleaseLock(tp, false)
-
- isConnected := true
- go func() {
- for isConnected {
- if _, err := stream.Recv(); err != nil {
- // println("disconnecting connection to", subscriberId, tp.String())
- isConnected = false
- lock.cond.Signal()
- }
- }
- }()
-
- lastReadTime := time.Now()
- switch in.Init.StartPosition {
- case mq_pb.SubscriberMessage_InitMessage_TIMESTAMP:
- lastReadTime = time.Unix(0, in.Init.TimestampNs)
- case mq_pb.SubscriberMessage_InitMessage_LATEST:
- case mq_pb.SubscriberMessage_InitMessage_EARLIEST:
- lastReadTime = time.Unix(0, 0)
- }
-
- // how to process each message
- // an error returned will end the subscription
- eachMessageFn := func(m *mq_pb.Message) error {
- err := stream.Send(&mq_pb.BrokerMessage{
- Data: m,
- })
- if err != nil {
- glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
- }
- return err
- }
-
- eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
- m := &mq_pb.Message{}
- if err = proto.Unmarshal(logEntry.Data, m); err != nil {
- glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
- return err
- }
- // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
- if err = eachMessageFn(m); err != nil {
- glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
- return err
- }
- if m.IsClose {
- // println("processed EOF")
- return io.EOF
- }
- processedTsNs = logEntry.TsNs
- messageCount++
- return nil
- }
-
- // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
-
- for {
-
- if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
- if err != io.EOF {
- // println("stopping from persisted logs", err.Error())
- return err
- }
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
-
- lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool {
- lock.Mutex.Lock()
- lock.cond.Wait()
- lock.Mutex.Unlock()
- return isConnected
- }, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
- continue
- }
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- if err != log_buffer.ResumeError {
- break
- }
- }
- }
-
- return err
-
-}
-
-func (broker *MessageQueueBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
- startTime = startTime.UTC()
- startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
-
- sizeBuf := make([]byte, 4)
- startTsNs := startTime.UnixNano()
-
- topicDir := genTopicDir(tp.Namespace, tp.Topic)
- partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
-
- return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
- dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
- return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
- if dayEntry.Name == startDate {
- hourMinute := util.FileNameBase(hourMinuteEntry.Name)
- if strings.Compare(hourMinute, startHourMinute) < 0 {
- return nil
- }
- }
- if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
- return nil
- }
- // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
- chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
- defer chunkedFileReader.Close()
- if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil {
- chunkedFileReader.Close()
- if err == io.EOF {
- return err
- }
- return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
- }
- return nil
- }, "", false, 24*60)
- }, startDate, true, 366)
-
-}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 36f216a48..63e248797 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,15 +1,11 @@
package broker
import (
- "context"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "time"
-
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -33,7 +29,6 @@ type MessageQueueBroker struct {
option *MessageQueueBrokerOption
grpcDialOption grpc.DialOption
MasterClient *wdclient.MasterClient
- topicManager *TopicManager
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -44,72 +39,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
}
- mqBroker.topicManager = NewTopicManager(mqBroker)
-
mqBroker.checkFilers()
- go mqBroker.keepConnectedToOneFiler()
go mqBroker.MasterClient.KeepConnectedToMaster()
return mqBroker, nil
}
-func (broker *MessageQueueBroker) keepConnectedToOneFiler() {
-
- for {
- for _, filer := range broker.option.Filers {
- broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.KeepConnected(ctx)
- if err != nil {
- glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- initRequest := &filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }
- for _, tp := range broker.topicManager.ListTopicPartitions() {
- initRequest.Resources = append(initRequest.Resources, tp.String())
- }
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- // TODO send events of adding/removing topics
-
- glog.V(0).Infof("conntected with filer: %v", filer)
- for {
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("send heartbeat")
- if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("received reply")
- time.Sleep(11 * time.Second)
- // println("woke up")
- }
- return nil
- })
- time.Sleep(3 * time.Second)
- }
- }
-
-}
-
func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
diff --git a/weed/mq/broker/topic_manager.go b/weed/mq/broker/topic_manager.go
deleted file mode 100644
index 34f063d9a..000000000
--- a/weed/mq/broker/topic_manager.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package broker
-
-import (
- "fmt"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
-)
-
-type TopicPartition struct {
- Namespace string
- Topic string
- Partition int32
-}
-
-const (
- TopicPartitionFmt = "%s/%s_%02d"
-)
-
-func (tp *TopicPartition) String() string {
- return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
-}
-
-type TopicControl struct {
- sync.Mutex
- cond *sync.Cond
- subscriberCount int
- publisherCount int
- logBuffer *log_buffer.LogBuffer
-}
-
-type TopicManager struct {
- sync.Mutex
- topicControls map[TopicPartition]*TopicControl
- broker *MessageQueueBroker
-}
-
-func NewTopicManager(messageBroker *MessageQueueBroker) *TopicManager {
- return &TopicManager{
- topicControls: make(map[TopicPartition]*TopicControl),
- broker: messageBroker,
- }
-}
-
-func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *mq_pb.TopicConfiguration) *log_buffer.LogBuffer {
-
- flushFn := func(startTime, stopTime time.Time, buf []byte) {
-
- if topicConfig.IsTransient {
- // return
- }
-
- // fmt.Printf("flushing with topic config %+v\n", topicConfig)
-
- startTime, stopTime = startTime.UTC(), stopTime.UTC()
- targetFile := fmt.Sprintf(
- "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
- filer.TopicsDir, tp.Namespace, tp.Topic,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- tp.Partition,
- )
-
- if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
- }
- logBuffer := log_buffer.NewLogBuffer("broker", time.Minute, flushFn, func() {
- tl.cond.Broadcast()
- })
-
- return logBuffer
-}
-
-func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *mq_pb.TopicConfiguration, isPublisher bool) *TopicControl {
- tm.Lock()
- defer tm.Unlock()
-
- tc, found := tm.topicControls[partition]
- if !found {
- tc = &TopicControl{}
- tc.cond = sync.NewCond(&tc.Mutex)
- tm.topicControls[partition] = tc
- tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
- }
- if isPublisher {
- tc.publisherCount++
- } else {
- tc.subscriberCount++
- }
- return tc
-}
-
-func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
- tm.Lock()
- defer tm.Unlock()
-
- lock, found := tm.topicControls[partition]
- if !found {
- return
- }
- if isPublisher {
- lock.publisherCount--
- } else {
- lock.subscriberCount--
- }
- if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
- delete(tm.topicControls, partition)
- lock.logBuffer.Shutdown()
- }
-}
-
-func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
- tm.Lock()
- defer tm.Unlock()
-
- for k := range tm.topicControls {
- tps = append(tps, k)
- }
- return
-}
diff --git a/weed/mq/msgclient/chan_config.go b/weed/mq/msgclient/chan_config.go
deleted file mode 100644
index a75678815..000000000
--- a/weed/mq/msgclient/chan_config.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package msgclient
-
-func (mc *MessagingClient) DeleteChannel(chanName string) error {
- return mc.DeleteTopic("chan", chanName)
-}
diff --git a/weed/mq/msgclient/chan_pub.go b/weed/mq/msgclient/chan_pub.go
deleted file mode 100644
index f4ffe832a..000000000
--- a/weed/mq/msgclient/chan_pub.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package msgclient
-
-import (
- "crypto/md5"
- "hash"
- "io"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-type PubChannel struct {
- client mq_pb.SeaweedMessaging_PublishClient
- grpcConnection *grpc.ClientConn
- md5hash hash.Hash
-}
-
-func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
- tp := broker.TopicPartition{
- Namespace: "chan",
- Topic: chanName,
- Partition: 0,
- }
- grpcConnection, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- pc, err := setupPublisherClient(grpcConnection, tp)
- if err != nil {
- return nil, err
- }
- return &PubChannel{
- client: pc,
- grpcConnection: grpcConnection,
- md5hash: md5.New(),
- }, nil
-}
-
-func (pc *PubChannel) Publish(m []byte) error {
- err := pc.client.Send(&mq_pb.PublishRequest{
- Data: &mq_pb.Message{
- Value: m,
- },
- })
- if err == nil {
- pc.md5hash.Write(m)
- }
- return err
-}
-func (pc *PubChannel) Close() error {
-
- // println("send closing")
- if err := pc.client.Send(&mq_pb.PublishRequest{
- Data: &mq_pb.Message{
- IsClose: true,
- },
- }); err != nil {
- log.Printf("err send close: %v", err)
- }
- // println("receive closing")
- if _, err := pc.client.Recv(); err != nil && err != io.EOF {
- log.Printf("err receive close: %v", err)
- }
- // println("close connection")
- if err := pc.grpcConnection.Close(); err != nil {
- log.Printf("err connection close: %v", err)
- }
- return nil
-}
-
-func (pc *PubChannel) Md5() []byte {
- return pc.md5hash.Sum(nil)
-}
diff --git a/weed/mq/msgclient/chan_sub.go b/weed/mq/msgclient/chan_sub.go
deleted file mode 100644
index 859b482ef..000000000
--- a/weed/mq/msgclient/chan_sub.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package msgclient
-
-import (
- "context"
- "crypto/md5"
- "hash"
- "io"
- "log"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-type SubChannel struct {
- ch chan []byte
- stream mq_pb.SeaweedMessaging_SubscribeClient
- md5hash hash.Hash
- cancel context.CancelFunc
-}
-
-func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
- tp := broker.TopicPartition{
- Namespace: "chan",
- Topic: chanName,
- Partition: 0,
- }
- grpcConnection, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
- sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
- if err != nil {
- return nil, err
- }
-
- t := &SubChannel{
- ch: make(chan []byte),
- stream: sc,
- md5hash: md5.New(),
- cancel: cancel,
- }
-
- go func() {
- for {
- resp, subErr := t.stream.Recv()
- if subErr == io.EOF {
- return
- }
- if subErr != nil {
- log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
- return
- }
- if resp.Data == nil {
- // this could be heartbeat from broker
- continue
- }
- if resp.Data.IsClose {
- t.stream.Send(&mq_pb.SubscriberMessage{
- IsClose: true,
- })
- close(t.ch)
- cancel()
- return
- }
- t.ch <- resp.Data.Value
- t.md5hash.Write(resp.Data.Value)
- }
- }()
-
- return t, nil
-}
-
-func (sc *SubChannel) Channel() chan []byte {
- return sc.ch
-}
-
-func (sc *SubChannel) Md5() []byte {
- return sc.md5hash.Sum(nil)
-}
-
-func (sc *SubChannel) Cancel() {
- sc.cancel()
-}
diff --git a/weed/mq/msgclient/client.go b/weed/mq/msgclient/client.go
deleted file mode 100644
index cc64f1acb..000000000
--- a/weed/mq/msgclient/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package msgclient
-
-import (
- "context"
- "fmt"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnections map[broker.TopicPartition]*grpc.ClientConn
- grpcDialOption grpc.DialOption
-}
-
-func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
- }
-}
-
-func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
-
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
- &mq_pb.FindBrokerRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Parition: tp.Partition,
- })
- if err != nil {
- return nil, err
- }
-
- targetBroker := resp.Broker
- return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
- }
- return nil, fmt.Errorf("no broker found for %+v", tp)
-}
diff --git a/weed/mq/msgclient/config.go b/weed/mq/msgclient/config.go
deleted file mode 100644
index 263ee856e..000000000
--- a/weed/mq/msgclient/config.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package msgclient
-
-import (
- "context"
- "log"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
-
- return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.ConfigureTopic(context.Background(),
- &mq_pb.ConfigureTopicRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Configuration: &mq_pb.TopicConfiguration{
- PartitionCount: 0,
- Collection: "",
- Replication: "",
- IsTransient: false,
- Partitoning: 0,
- },
- })
- return err
- })
-
-}
-
-func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
-
- return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.DeleteTopic(context.Background(),
- &mq_pb.DeleteTopicRequest{
- Namespace: namespace,
- Topic: topic,
- })
- return err
- })
-}
-
-func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error {
-
- var lastErr error
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection))
- if err == nil {
- return nil
- }
- lastErr = err
- }
-
- return lastErr
-}
diff --git a/weed/mq/msgclient/publisher.go b/weed/mq/msgclient/publisher.go
deleted file mode 100644
index 823791d10..000000000
--- a/weed/mq/msgclient/publisher.go
+++ /dev/null
@@ -1,118 +0,0 @@
-package msgclient
-
-import (
- "context"
-
- "github.com/OneOfOne/xxhash"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-type Publisher struct {
- publishClients []mq_pb.SeaweedMessaging_PublishClient
- topicConfiguration *mq_pb.TopicConfiguration
- messageCount uint64
- publisherId string
-}
-
-func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
- // read topic configuration
- topicConfiguration := &mq_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- tp := broker.TopicPartition{
- Namespace: namespace,
- Topic: topic,
- Partition: int32(i),
- }
- grpcClientConn, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- client, err := setupPublisherClient(grpcClientConn, tp)
- if err != nil {
- return nil, err
- }
- publishClients[i] = client
- }
- return &Publisher{
- publishClients: publishClients,
- topicConfiguration: topicConfiguration,
- }, nil
-}
-
-func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {
-
- stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
- if err != nil {
- return nil, err
- }
-
- // send init message
- err = stream.Send(&mq_pb.PublishRequest{
- Init: &mq_pb.PublishRequest_InitMessage{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Partition: tp.Partition,
- },
- })
- if err != nil {
- return nil, err
- }
-
- // process init response
- initResponse, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
- if initResponse.Config != nil {
- }
-
- // setup looks for control messages
- doneChan := make(chan error, 1)
- go func() {
- for {
- in, err := stream.Recv()
- if err != nil {
- doneChan <- err
- return
- }
- if in.Redirect != nil {
- }
- if in.Config != nil {
- }
- }
- }()
-
- return stream, nil
-
-}
-
-func (p *Publisher) Publish(m *mq_pb.Message) error {
- hashValue := p.messageCount
- p.messageCount++
- if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
- if m.Key != nil {
- hashValue = xxhash.Checksum64(m.Key)
- }
- } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
- hashValue = xxhash.Checksum64(m.Key)
- } else {
- // round robin
- }
-
- idx := int(hashValue) % len(p.publishClients)
- if idx < 0 {
- idx += len(p.publishClients)
- }
- return p.publishClients[idx].Send(&mq_pb.PublishRequest{
- Data: m,
- })
-}
diff --git a/weed/mq/msgclient/subscriber.go b/weed/mq/msgclient/subscriber.go
deleted file mode 100644
index f3da40fb3..000000000
--- a/weed/mq/msgclient/subscriber.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package msgclient
-
-import (
- "context"
- "io"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
-)
-
-type Subscriber struct {
- subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient
- subscriberCancels []context.CancelFunc
- subscriberId string
-}
-
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
- // read topic configuration
- topicConfiguration := &mq_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
- subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
-
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- if partitionId >= 0 && i != partitionId {
- continue
- }
- tp := broker.TopicPartition{
- Namespace: namespace,
- Topic: topic,
- Partition: int32(i),
- }
- grpcClientConn, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
- client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
- if err != nil {
- return nil, err
- }
- subscriberClients[i] = client
- subscriberCancels[i] = cancel
- }
-
- return &Subscriber{
- subscriberClients: subscriberClients,
- subscriberCancels: subscriberCancels,
- subscriberId: subscriberId,
- }, nil
-}
-
-func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) {
- stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
- if err != nil {
- return
- }
-
- // send init message
- err = stream.Send(&mq_pb.SubscriberMessage{
- Init: &mq_pb.SubscriberMessage_InitMessage{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Partition: tp.Partition,
- StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP,
- TimestampNs: startTime.UnixNano(),
- SubscriberId: subscriberId,
- },
- })
- if err != nil {
- return
- }
-
- return stream, nil
-}
-
-func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error {
- for {
- resp, listenErr := subscriberClient.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- println(listenErr.Error())
- return listenErr
- }
- if resp.Data == nil {
- // this could be heartbeat from broker
- continue
- }
- processFn(resp.Data)
- }
-}
-
-// Subscribe starts goroutines to process the messages
-func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) {
- var wg sync.WaitGroup
- for i := 0; i < len(s.subscriberClients); i++ {
- if s.subscriberClients[i] != nil {
- wg.Add(1)
- go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) {
- defer wg.Done()
- doSubscribe(subscriberClient, processFn)
- }(s.subscriberClients[i])
- }
- }
- wg.Wait()
-}
-
-func (s *Subscriber) Shutdown() {
- for i := 0; i < len(s.subscriberClients); i++ {
- if s.subscriberCancels[i] != nil {
- s.subscriberCancels[i]()
- }
- }
-}
diff --git a/weed/mq/topic.go b/weed/mq/topic.go
new file mode 100644
index 000000000..fc1923af1
--- /dev/null
+++ b/weed/mq/topic.go
@@ -0,0 +1,23 @@
+package mq
+
+import "time"
+
+type Namespace string
+
+type Topic struct {
+ namespace Namespace
+ name string
+}
+
+type Partition struct {
+ rangeStart int
+ rangeStop int // exclusive
+ ringSize int
+}
+
+type Segment struct {
+ topic Topic
+ id int32
+ partition Partition
+ lastModified time.Time
+}