aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/messaging
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_append.go113
-rw-r--r--weed/messaging/broker/broker_grpc_server.go37
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go116
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go112
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go177
-rw-r--r--weed/messaging/broker/broker_server.go114
-rw-r--r--weed/messaging/broker/consistent_distribution.go38
-rw-r--r--weed/messaging/broker/consistent_distribution_test.go32
-rw-r--r--weed/messaging/broker/topic_manager.go124
-rw-r--r--weed/messaging/msgclient/chan_config.go5
-rw-r--r--weed/messaging/msgclient/chan_pub.go76
-rw-r--r--weed/messaging/msgclient/chan_sub.go85
-rw-r--r--weed/messaging/msgclient/client.go55
-rw-r--r--weed/messaging/msgclient/config.go63
-rw-r--r--weed/messaging/msgclient/publisher.go118
-rw-r--r--weed/messaging/msgclient/subscriber.go120
16 files changed, 1385 insertions, 0 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
new file mode 100644
index 000000000..8e5b56fd0
--- /dev/null
+++ b/weed/messaging/broker/broker_append.go
@@ -0,0 +1,113 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
+
+ assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
+ if err2 != nil {
+ return err2
+ }
+
+ dir, name := util.FullPath(targetFile).DirAndName()
+
+ // append the chunk
+ if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AppendToEntryRequest{
+ Directory: dir,
+ EntryName: name,
+ Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
+ }
+
+ _, err := client.AppendToEntry(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("append to file %v: %v", request, err)
+ return err
+ }
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("append to file %v: %v", targetFile, err)
+ }
+
+ return nil
+}
+
+func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+
+ var assignResult = &operation.AssignResult{}
+
+ // assign a volume location
+ if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: topicConfig.Replication,
+ Collection: topicConfig.Collection,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ assignResult.Auth = security.EncodedJwt(resp.Auth)
+ assignResult.Fid = resp.FileId
+ assignResult.Url = resp.Url
+ assignResult.PublicUrl = resp.PublicUrl
+ assignResult.Count = uint64(resp.Count)
+
+ return nil
+ }); err != nil {
+ return nil, nil, err
+ }
+
+ // upload data
+ targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
+ uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, uploadResult, nil
+}
+
+var _ = filer_pb.FilerClient(&MessageBroker{})
+
+func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+
+ for _, filer := range broker.option.Filers {
+ if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ if err == io.EOF {
+ return
+ }
+ glog.V(0).Infof("fail to connect to %s: %v", filer, err)
+ } else {
+ break
+ }
+ }
+
+ return
+
+}
+
+func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go
new file mode 100644
index 000000000..ba141fdd0
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server.go
@@ -0,0 +1,37 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
+ resp := &messaging_pb.DeleteTopicResponse{}
+ dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
+ if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
+ return nil, err
+ } else if exists {
+ err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil)
+ }
+ return resp, nil
+}
+
+func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
+ panic("implement me")
+}
+
+func genTopicDir(namespace, topic string) string {
+ return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic)
+}
+
+func genTopicDirEntry(namespace, topic string) (dir, entry string) {
+ return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic
+}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
new file mode 100644
index 000000000..3c14f3220
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -0,0 +1,116 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+/*
+Topic discovery:
+
+When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
+
+The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
+Otherwise, just host the topic.
+
+So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
+If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
+
+*/
+
+func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
+
+ t := &messaging_pb.FindBrokerResponse{}
+ var peers []string
+
+ targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
+
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
+ Resource: targetTopicPartition,
+ })
+ if err != nil {
+ return err
+ }
+ if resp.Found && len(resp.Resources) > 0 {
+ t.Broker = resp.Resources[0].GrpcAddresses
+ return nil
+ }
+ for _, b := range resp.Resources {
+ peers = append(peers, b.GrpcAddresses)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ t.Broker = PickMember(peers, []byte(targetTopicPartition))
+
+ return t, nil
+
+}
+
+func (broker *MessageBroker) checkFilers() {
+
+ // contact a filer about masters
+ var masters []string
+ found := false
+ for !found {
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received master list: %s", masters)
+
+ // contact each masters for filers
+ var filers []string
+ found = false
+ for !found {
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to list filers: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received filer list: %s", filers)
+
+ broker.option.Filers = filers
+
+}
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
new file mode 100644
index 000000000..6e6b723d1
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -0,0 +1,112 @@
+package broker
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
+
+ // process initial request
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ // TODO look it up
+ topicConfig := &messaging_pb.TopicConfiguration{
+ // IsTransient: true,
+ }
+
+ // send init response
+ initResponse := &messaging_pb.PublishResponse{
+ Config: nil,
+ Redirect: nil,
+ }
+ err = stream.Send(initResponse)
+ if err != nil {
+ return err
+ }
+ if initResponse.Redirect != nil {
+ return nil
+ }
+
+ // get lock
+ tp := TopicPartition{
+ Namespace: in.Init.Namespace,
+ Topic: in.Init.Topic,
+ Partition: in.Init.Partition,
+ }
+
+ tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
+ md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
+ // println("chan data stored under", tpDir, "as", md5File)
+
+ if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
+ return fmt.Errorf("channel is already closed")
+ }
+
+ tl := broker.topicManager.RequestLock(tp, topicConfig, true)
+ defer broker.topicManager.ReleaseLock(tp, true)
+
+ md5hash := md5.New()
+ // process each message
+ for {
+ // println("recv")
+ in, err := stream.Recv()
+ // glog.V(0).Infof("recieved %v err: %v", in, err)
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ if in.Data == nil {
+ continue
+ }
+
+ // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
+
+ data, err := proto.Marshal(in.Data)
+ if err != nil {
+ glog.Errorf("marshall error: %v\n", err)
+ continue
+ }
+
+ tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
+
+ if in.Data.IsClose {
+ // println("server received closing")
+ break
+ }
+
+ md5hash.Write(in.Data.Value)
+
+ }
+
+ if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
+ glog.V(0).Infof("err writing %s: %v", md5File, err)
+ }
+
+ // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
+
+ // send the close ack
+ // println("server send ack closing")
+ if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
+ glog.V(0).Infof("err sending close response: %v", err)
+ }
+ return nil
+
+}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
new file mode 100644
index 000000000..3021473e5
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -0,0 +1,177 @@
+package broker
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+ "io"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
+
+ // process initial request
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ var processedTsNs int64
+ var messageCount int64
+ subscriberId := in.Init.SubscriberId
+
+ // TODO look it up
+ topicConfig := &messaging_pb.TopicConfiguration{
+ // IsTransient: true,
+ }
+
+ // get lock
+ tp := TopicPartition{
+ Namespace: in.Init.Namespace,
+ Topic: in.Init.Topic,
+ Partition: in.Init.Partition,
+ }
+ fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
+ defer func() {
+ fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
+ }()
+
+ lock := broker.topicManager.RequestLock(tp, topicConfig, false)
+ defer broker.topicManager.ReleaseLock(tp, false)
+
+ isConnected := true
+ go func() {
+ for isConnected {
+ if _, err := stream.Recv(); err != nil {
+ // println("disconnecting connection to", subscriberId, tp.String())
+ isConnected = false
+ lock.cond.Signal()
+ }
+ }
+ }()
+
+ lastReadTime := time.Now()
+ switch in.Init.StartPosition {
+ case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
+ lastReadTime = time.Unix(0, in.Init.TimestampNs)
+ case messaging_pb.SubscriberMessage_InitMessage_LATEST:
+ case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
+ lastReadTime = time.Unix(0, 0)
+ }
+
+ // how to process each message
+ // an error returned will end the subscription
+ eachMessageFn := func(m *messaging_pb.Message) error {
+ err := stream.Send(&messaging_pb.BrokerMessage{
+ Data: m,
+ })
+ if err != nil {
+ glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
+ }
+ return err
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
+ m := &messaging_pb.Message{}
+ if err = proto.Unmarshal(logEntry.Data, m); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ return err
+ }
+ // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
+ if err = eachMessageFn(m); err != nil {
+ glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
+ return err
+ }
+ if m.IsClose {
+ // println("processed EOF")
+ return io.EOF
+ }
+ processedTsNs = logEntry.TsNs
+ messageCount++
+ return nil
+ }
+
+ // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
+
+ for {
+
+ if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
+ if err != io.EOF {
+ // println("stopping from persisted logs", err.Error())
+ return err
+ }
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
+ lastReadTime, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ lock.Mutex.Lock()
+ lock.cond.Wait()
+ lock.Mutex.Unlock()
+ return isConnected
+ }, eachLogEntryFn)
+ if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
+
+ return err
+
+}
+
+func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ startTime = startTime.UTC()
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
+ startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+
+ sizeBuf := make([]byte, 4)
+ startTsNs := startTime.UnixNano()
+
+ topicDir := genTopicDir(tp.Namespace, tp.Topic)
+ partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
+
+ return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
+ dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
+ return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
+ if dayEntry.Name == startDate {
+ if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
+ return nil
+ }
+ }
+ if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
+ return nil
+ }
+ // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
+ chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
+ defer chunkedFileReader.Close()
+ if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ chunkedFileReader.Close()
+ if err == io.EOF {
+ return err
+ }
+ return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
+ }
+ return nil
+ }, "", false, 24*60)
+ }, startDate, true, 366)
+
+}
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
new file mode 100644
index 000000000..06162471c
--- /dev/null
+++ b/weed/messaging/broker/broker_server.go
@@ -0,0 +1,114 @@
+package broker
+
+import (
+ "context"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+)
+
+type MessageBrokerOption struct {
+ Filers []string
+ DefaultReplication string
+ MaxMB int
+ Ip string
+ Port int
+ Cipher bool
+}
+
+type MessageBroker struct {
+ option *MessageBrokerOption
+ grpcDialOption grpc.DialOption
+ topicManager *TopicManager
+}
+
+func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
+
+ messageBroker = &MessageBroker{
+ option: option,
+ grpcDialOption: grpcDialOption,
+ }
+
+ messageBroker.topicManager = NewTopicManager(messageBroker)
+
+ messageBroker.checkFilers()
+
+ go messageBroker.keepConnectedToOneFiler()
+
+ return messageBroker, nil
+}
+
+func (broker *MessageBroker) keepConnectedToOneFiler() {
+
+ for {
+ for _, filer := range broker.option.Filers {
+ broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.KeepConnected(ctx)
+ if err != nil {
+ glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+
+ initRequest := &filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }
+ for _, tp := range broker.topicManager.ListTopicPartitions() {
+ initRequest.Resources = append(initRequest.Resources, tp.String())
+ }
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+
+ // TODO send events of adding/removing topics
+
+ glog.V(0).Infof("conntected with filer: %v", filer)
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("send heartbeat")
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("received reply")
+ time.Sleep(11 * time.Second)
+ // println("woke up")
+ }
+ return nil
+ })
+ time.Sleep(3 * time.Second)
+ }
+ }
+
+}
+
+func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+
+}
+
+func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+
+ return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return fn(client)
+ })
+
+}
diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go
new file mode 100644
index 000000000..465a2a8f2
--- /dev/null
+++ b/weed/messaging/broker/consistent_distribution.go
@@ -0,0 +1,38 @@
+package broker
+
+import (
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+)
+
+type Member string
+
+func (m Member) String() string {
+ return string(m)
+}
+
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+func PickMember(members []string, key []byte) string {
+ cfg := consistent.Config{
+ PartitionCount: 9791,
+ ReplicationFactor: 2,
+ Load: 1.25,
+ Hasher: hasher{},
+ }
+
+ cmembers := []consistent.Member{}
+ for _, m := range members {
+ cmembers = append(cmembers, Member(m))
+ }
+
+ c := consistent.New(cmembers, cfg)
+
+ m := c.LocateKey(key)
+
+ return m.String()
+}
diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go
new file mode 100644
index 000000000..f58fe4e0e
--- /dev/null
+++ b/weed/messaging/broker/consistent_distribution_test.go
@@ -0,0 +1,32 @@
+package broker
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestPickMember(t *testing.T) {
+
+ servers := []string{
+ "s1:port",
+ "s2:port",
+ "s3:port",
+ "s5:port",
+ "s4:port",
+ }
+
+ total := 1000
+
+ distribution := make(map[string]int)
+ for i := 0; i < total; i++ {
+ tp := fmt.Sprintf("tp:%2d", i)
+ m := PickMember(servers, []byte(tp))
+ // println(tp, "=>", m)
+ distribution[m]++
+ }
+
+ for member, count := range distribution {
+ fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
+ }
+
+}
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
new file mode 100644
index 000000000..edddca813
--- /dev/null
+++ b/weed/messaging/broker/topic_manager.go
@@ -0,0 +1,124 @@
+package broker
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+type TopicPartition struct {
+ Namespace string
+ Topic string
+ Partition int32
+}
+
+const (
+ TopicPartitionFmt = "%s/%s_%02d"
+)
+
+func (tp *TopicPartition) String() string {
+ return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
+}
+
+type TopicControl struct {
+ sync.Mutex
+ cond *sync.Cond
+ subscriberCount int
+ publisherCount int
+ logBuffer *log_buffer.LogBuffer
+}
+
+type TopicManager struct {
+ sync.Mutex
+ topicControls map[TopicPartition]*TopicControl
+ broker *MessageBroker
+}
+
+func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
+ return &TopicManager{
+ topicControls: make(map[TopicPartition]*TopicControl),
+ broker: messageBroker,
+ }
+}
+
+func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
+
+ flushFn := func(startTime, stopTime time.Time, buf []byte) {
+
+ if topicConfig.IsTransient {
+ // return
+ }
+
+ // fmt.Printf("flushing with topic config %+v\n", topicConfig)
+
+ startTime, stopTime = startTime.UTC(), stopTime.UTC()
+ targetFile := fmt.Sprintf(
+ "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
+ filer.TopicsDir, tp.Namespace, tp.Topic,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ tp.Partition,
+ )
+
+ if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
+ }
+ logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
+ tl.cond.Broadcast()
+ })
+
+ return logBuffer
+}
+
+func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
+ tm.Lock()
+ defer tm.Unlock()
+
+ tc, found := tm.topicControls[partition]
+ if !found {
+ tc = &TopicControl{}
+ tc.cond = sync.NewCond(&tc.Mutex)
+ tm.topicControls[partition] = tc
+ tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
+ }
+ if isPublisher {
+ tc.publisherCount++
+ } else {
+ tc.subscriberCount++
+ }
+ return tc
+}
+
+func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
+ tm.Lock()
+ defer tm.Unlock()
+
+ lock, found := tm.topicControls[partition]
+ if !found {
+ return
+ }
+ if isPublisher {
+ lock.publisherCount--
+ } else {
+ lock.subscriberCount--
+ }
+ if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
+ delete(tm.topicControls, partition)
+ lock.logBuffer.Shutdown()
+ }
+}
+
+func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
+ tm.Lock()
+ defer tm.Unlock()
+
+ for k := range tm.topicControls {
+ tps = append(tps, k)
+ }
+ return
+}
diff --git a/weed/messaging/msgclient/chan_config.go b/weed/messaging/msgclient/chan_config.go
new file mode 100644
index 000000000..a75678815
--- /dev/null
+++ b/weed/messaging/msgclient/chan_config.go
@@ -0,0 +1,5 @@
+package msgclient
+
+func (mc *MessagingClient) DeleteChannel(chanName string) error {
+ return mc.DeleteTopic("chan", chanName)
+}
diff --git a/weed/messaging/msgclient/chan_pub.go b/weed/messaging/msgclient/chan_pub.go
new file mode 100644
index 000000000..9bc88f7c0
--- /dev/null
+++ b/weed/messaging/msgclient/chan_pub.go
@@ -0,0 +1,76 @@
+package msgclient
+
+import (
+ "crypto/md5"
+ "hash"
+ "io"
+ "log"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type PubChannel struct {
+ client messaging_pb.SeaweedMessaging_PublishClient
+ grpcConnection *grpc.ClientConn
+ md5hash hash.Hash
+}
+
+func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ pc, err := setupPublisherClient(grpcConnection, tp)
+ if err != nil {
+ return nil, err
+ }
+ return &PubChannel{
+ client: pc,
+ grpcConnection: grpcConnection,
+ md5hash: md5.New(),
+ }, nil
+}
+
+func (pc *PubChannel) Publish(m []byte) error {
+ err := pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ Value: m,
+ },
+ })
+ if err == nil {
+ pc.md5hash.Write(m)
+ }
+ return err
+}
+func (pc *PubChannel) Close() error {
+
+ // println("send closing")
+ if err := pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ IsClose: true,
+ },
+ }); err != nil {
+ log.Printf("err send close: %v", err)
+ }
+ // println("receive closing")
+ if _, err := pc.client.Recv(); err != nil && err != io.EOF {
+ log.Printf("err receive close: %v", err)
+ }
+ // println("close connection")
+ if err := pc.grpcConnection.Close(); err != nil {
+ log.Printf("err connection close: %v", err)
+ }
+ return nil
+}
+
+func (pc *PubChannel) Md5() []byte {
+ return pc.md5hash.Sum(nil)
+}
diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/messaging/msgclient/chan_sub.go
new file mode 100644
index 000000000..213ff4666
--- /dev/null
+++ b/weed/messaging/msgclient/chan_sub.go
@@ -0,0 +1,85 @@
+package msgclient
+
+import (
+ "context"
+ "crypto/md5"
+ "hash"
+ "io"
+ "log"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type SubChannel struct {
+ ch chan []byte
+ stream messaging_pb.SeaweedMessaging_SubscribeClient
+ md5hash hash.Hash
+ cancel context.CancelFunc
+}
+
+func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
+ if err != nil {
+ return nil, err
+ }
+
+ t := &SubChannel{
+ ch: make(chan []byte),
+ stream: sc,
+ md5hash: md5.New(),
+ cancel: cancel,
+ }
+
+ go func() {
+ for {
+ resp, subErr := t.stream.Recv()
+ if subErr == io.EOF {
+ return
+ }
+ if subErr != nil {
+ log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
+ return
+ }
+ if resp.Data == nil {
+ // this could be heartbeat from broker
+ continue
+ }
+ if resp.Data.IsClose {
+ t.stream.Send(&messaging_pb.SubscriberMessage{
+ IsClose: true,
+ })
+ close(t.ch)
+ cancel()
+ return
+ }
+ t.ch <- resp.Data.Value
+ t.md5hash.Write(resp.Data.Value)
+ }
+ }()
+
+ return t, nil
+}
+
+func (sc *SubChannel) Channel() chan []byte {
+ return sc.ch
+}
+
+func (sc *SubChannel) Md5() []byte {
+ return sc.md5hash.Sum(nil)
+}
+
+func (sc *SubChannel) Cancel() {
+ sc.cancel()
+}
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go
new file mode 100644
index 000000000..4d7ef2b8e
--- /dev/null
+++ b/weed/messaging/msgclient/client.go
@@ -0,0 +1,55 @@
+package msgclient
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessagingClient struct {
+ bootstrapBrokers []string
+ grpcConnections map[broker.TopicPartition]*grpc.ClientConn
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
+ return &MessagingClient{
+ bootstrapBrokers: bootstrapBrokers,
+ grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
+ }
+}
+
+func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
+
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
+ &messaging_pb.FindBrokerRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Parition: tp.Partition,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ targetBroker := resp.Broker
+ return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
+ }
+ return nil, fmt.Errorf("no broker found for %+v", tp)
+}
diff --git a/weed/messaging/msgclient/config.go b/weed/messaging/msgclient/config.go
new file mode 100644
index 000000000..2b9eba1a8
--- /dev/null
+++ b/weed/messaging/msgclient/config.go
@@ -0,0 +1,63 @@
+package msgclient
+
+import (
+ "context"
+ "log"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
+
+ return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ _, err := client.ConfigureTopic(context.Background(),
+ &messaging_pb.ConfigureTopicRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Configuration: &messaging_pb.TopicConfiguration{
+ PartitionCount: 0,
+ Collection: "",
+ Replication: "",
+ IsTransient: false,
+ Partitoning: 0,
+ },
+ })
+ return err
+ })
+
+}
+
+func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
+
+ return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ _, err := client.DeleteTopic(context.Background(),
+ &messaging_pb.DeleteTopicRequest{
+ Namespace: namespace,
+ Topic: topic,
+ })
+ return err
+ })
+}
+
+func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+
+ var lastErr error
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection))
+ if err == nil {
+ return nil
+ }
+ lastErr = err
+ }
+
+ return lastErr
+}
diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go
new file mode 100644
index 000000000..1aa483ff8
--- /dev/null
+++ b/weed/messaging/msgclient/publisher.go
@@ -0,0 +1,118 @@
+package msgclient
+
+import (
+ "context"
+
+ "github.com/OneOfOne/xxhash"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type Publisher struct {
+ publishClients []messaging_pb.SeaweedMessaging_PublishClient
+ topicConfiguration *messaging_pb.TopicConfiguration
+ messageCount uint64
+ publisherId string
+}
+
+func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ tp := broker.TopicPartition{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: int32(i),
+ }
+ grpcClientConn, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ client, err := setupPublisherClient(grpcClientConn, tp)
+ if err != nil {
+ return nil, err
+ }
+ publishClients[i] = client
+ }
+ return &Publisher{
+ publishClients: publishClients,
+ topicConfiguration: topicConfiguration,
+ }, nil
+}
+
+func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
+
+ stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.PublishRequest{
+ Init: &messaging_pb.PublishRequest_InitMessage{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+ if initResponse.Config != nil {
+ }
+
+ // setup looks for control messages
+ doneChan := make(chan error, 1)
+ go func() {
+ for {
+ in, err := stream.Recv()
+ if err != nil {
+ doneChan <- err
+ return
+ }
+ if in.Redirect != nil {
+ }
+ if in.Config != nil {
+ }
+ }
+ }()
+
+ return stream, nil
+
+}
+
+func (p *Publisher) Publish(m *messaging_pb.Message) error {
+ hashValue := p.messageCount
+ p.messageCount++
+ if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
+ if m.Key != nil {
+ hashValue = xxhash.Checksum64(m.Key)
+ }
+ } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
+ hashValue = xxhash.Checksum64(m.Key)
+ } else {
+ // round robin
+ }
+
+ idx := int(hashValue) % len(p.publishClients)
+ if idx < 0 {
+ idx += len(p.publishClients)
+ }
+ return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
+ Data: m,
+ })
+}
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
new file mode 100644
index 000000000..6c7dc1ab7
--- /dev/null
+++ b/weed/messaging/msgclient/subscriber.go
@@ -0,0 +1,120 @@
+package msgclient
+
+import (
+ "context"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "google.golang.org/grpc"
+)
+
+type Subscriber struct {
+ subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberCancels []context.CancelFunc
+ subscriberId string
+}
+
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
+ subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
+
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ if partitionId >= 0 && i != partitionId {
+ continue
+ }
+ tp := broker.TopicPartition{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: int32(i),
+ }
+ grpcClientConn, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
+ if err != nil {
+ return nil, err
+ }
+ subscriberClients[i] = client
+ subscriberCancels[i] = cancel
+ }
+
+ return &Subscriber{
+ subscriberClients: subscriberClients,
+ subscriberCancels: subscriberCancels,
+ subscriberId: subscriberId,
+ }, nil
+}
+
+func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
+ stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
+ if err != nil {
+ return
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.SubscriberMessage{
+ Init: &messaging_pb.SubscriberMessage_InitMessage{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
+ StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
+ TimestampNs: startTime.UnixNano(),
+ SubscriberId: subscriberId,
+ },
+ })
+ if err != nil {
+ return
+ }
+
+ return stream, nil
+}
+
+func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
+ for {
+ resp, listenErr := subscriberClient.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ println(listenErr.Error())
+ return listenErr
+ }
+ if resp.Data == nil {
+ // this could be heartbeat from broker
+ continue
+ }
+ processFn(resp.Data)
+ }
+}
+
+// Subscribe starts goroutines to process the messages
+func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
+ var wg sync.WaitGroup
+ for i := 0; i < len(s.subscriberClients); i++ {
+ if s.subscriberClients[i] != nil {
+ wg.Add(1)
+ go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) {
+ defer wg.Done()
+ doSubscribe(subscriberClient, processFn)
+ }(s.subscriberClients[i])
+ }
+ }
+ wg.Wait()
+}
+
+func (s *Subscriber) Shutdown() {
+ for i := 0; i < len(s.subscriberClients); i++ {
+ if s.subscriberCancels[i] != nil {
+ s.subscriberCancels[i]()
+ }
+ }
+}