aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_append.go130
-rw-r--r--weed/messaging/broker/broker_grpc_server.go37
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go122
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go112
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go178
-rw-r--r--weed/messaging/broker/broker_server.go116
-rw-r--r--weed/messaging/broker/consistent_distribution.go38
-rw-r--r--weed/messaging/broker/consistent_distribution_test.go32
-rw-r--r--weed/messaging/broker/topic_manager.go124
-rw-r--r--weed/messaging/msgclient/chan_config.go5
-rw-r--r--weed/messaging/msgclient/chan_pub.go76
-rw-r--r--weed/messaging/msgclient/chan_sub.go85
-rw-r--r--weed/messaging/msgclient/client.go55
-rw-r--r--weed/messaging/msgclient/config.go63
-rw-r--r--weed/messaging/msgclient/publisher.go118
-rw-r--r--weed/messaging/msgclient/subscriber.go120
16 files changed, 0 insertions, 1411 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
deleted file mode 100644
index 9a31a8ac0..000000000
--- a/weed/messaging/broker/broker_append.go
+++ /dev/null
@@ -1,130 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
-
- assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
- if err2 != nil {
- return err2
- }
-
- dir, name := util.FullPath(targetFile).DirAndName()
-
- // append the chunk
- if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AppendToEntryRequest{
- Directory: dir,
- EntryName: name,
- Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
- }
-
- _, err := client.AppendToEntry(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("append to file %v: %v", request, err)
- return err
- }
-
- return nil
- }); err != nil {
- return fmt.Errorf("append to file %v: %v", targetFile, err)
- }
-
- return nil
-}
-
-func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
-
- var assignResult = &operation.AssignResult{}
-
- // assign a volume location
- if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
-
- assignErr := util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: topicConfig.Replication,
- Collection: topicConfig.Collection,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- assignResult.Auth = security.EncodedJwt(resp.Auth)
- assignResult.Fid = resp.FileId
- assignResult.Url = resp.Location.Url
- assignResult.PublicUrl = resp.Location.PublicUrl
- assignResult.GrpcPort = int(resp.Location.GrpcPort)
- assignResult.Count = uint64(resp.Count)
-
- return nil
- })
- if assignErr != nil {
- return assignErr
- }
-
- return nil
- }); err != nil {
- return nil, nil, err
- }
-
- // upload data
- targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
- uploadOption := &operation.UploadOption{
- UploadUrl: targetUrl,
- Filename: "",
- Cipher: broker.option.Cipher,
- IsInputCompressed: false,
- MimeType: "",
- PairMap: nil,
- Jwt: assignResult.Auth,
- }
- uploadResult, err := operation.UploadData(data, uploadOption)
- if err != nil {
- return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
- }
- // println("uploaded to", targetUrl)
- return assignResult, uploadResult, nil
-}
-
-var _ = filer_pb.FilerClient(&MessageBroker{})
-
-func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
-
- for _, filer := range broker.option.Filers {
- if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
- if err == io.EOF {
- return
- }
- glog.V(0).Infof("fail to connect to %s: %v", filer, err)
- } else {
- break
- }
- }
-
- return
-
-}
-
-func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go
deleted file mode 100644
index ba141fdd0..000000000
--- a/weed/messaging/broker/broker_grpc_server.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
- panic("implement me")
-}
-
-func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
- resp := &messaging_pb.DeleteTopicResponse{}
- dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
- if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
- return nil, err
- } else if exists {
- err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil)
- }
- return resp, nil
-}
-
-func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
- panic("implement me")
-}
-
-func genTopicDir(namespace, topic string) string {
- return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic)
-}
-
-func genTopicDirEntry(namespace, topic string) (dir, entry string) {
- return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic
-}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
deleted file mode 100644
index 5cd8edd33..000000000
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ /dev/null
@@ -1,122 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/cluster"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-/*
-Topic discovery:
-
-When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
-
-The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
-Otherwise, just host the topic.
-
-So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
-If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
-
-*/
-
-func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
-
- t := &messaging_pb.FindBrokerResponse{}
- var peers []string
-
- targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
-
- for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
- Resource: targetTopicPartition,
- })
- if err != nil {
- return err
- }
- if resp.Found && len(resp.Resources) > 0 {
- t.Broker = resp.Resources[0].GrpcAddresses
- return nil
- }
- for _, b := range resp.Resources {
- peers = append(peers, b.GrpcAddresses)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- }
-
- t.Broker = PickMember(peers, []byte(targetTopicPartition))
-
- return t, nil
-
-}
-
-func (broker *MessageBroker) checkFilers() {
-
- // contact a filer about masters
- var masters []pb.ServerAddress
- found := false
- for !found {
- for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return err
- }
- for _, m := range resp.Masters {
- masters = append(masters, pb.ServerAddress(m))
- }
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received master list: %s", masters)
-
- // contact each masters for filers
- var filers []pb.ServerAddress
- found = false
- for !found {
- for _, master := range masters {
- err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
- ClientType: cluster.FilerType,
- })
- if err != nil {
- return err
- }
-
- for _, clusterNode := range resp.ClusterNodes {
- filers = append(filers, pb.ServerAddress(clusterNode.Address))
- }
-
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to list filers: %v", err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received filer list: %s", filers)
-
- broker.option.Filers = filers
-
-}
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
deleted file mode 100644
index 6e6b723d1..000000000
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ /dev/null
@@ -1,112 +0,0 @@
-package broker
-
-import (
- "crypto/md5"
- "fmt"
- "io"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- // TODO look it up
- topicConfig := &messaging_pb.TopicConfiguration{
- // IsTransient: true,
- }
-
- // send init response
- initResponse := &messaging_pb.PublishResponse{
- Config: nil,
- Redirect: nil,
- }
- err = stream.Send(initResponse)
- if err != nil {
- return err
- }
- if initResponse.Redirect != nil {
- return nil
- }
-
- // get lock
- tp := TopicPartition{
- Namespace: in.Init.Namespace,
- Topic: in.Init.Topic,
- Partition: in.Init.Partition,
- }
-
- tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
- md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
- // println("chan data stored under", tpDir, "as", md5File)
-
- if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
- return fmt.Errorf("channel is already closed")
- }
-
- tl := broker.topicManager.RequestLock(tp, topicConfig, true)
- defer broker.topicManager.ReleaseLock(tp, true)
-
- md5hash := md5.New()
- // process each message
- for {
- // println("recv")
- in, err := stream.Recv()
- // glog.V(0).Infof("recieved %v err: %v", in, err)
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- if in.Data == nil {
- continue
- }
-
- // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
-
- data, err := proto.Marshal(in.Data)
- if err != nil {
- glog.Errorf("marshall error: %v\n", err)
- continue
- }
-
- tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
-
- if in.Data.IsClose {
- // println("server received closing")
- break
- }
-
- md5hash.Write(in.Data.Value)
-
- }
-
- if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
- glog.V(0).Infof("err writing %s: %v", md5File, err)
- }
-
- // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
-
- // send the close ack
- // println("server send ack closing")
- if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
- glog.V(0).Infof("err sending close response: %v", err)
- }
- return nil
-
-}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
deleted file mode 100644
index 20d529239..000000000
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ /dev/null
@@ -1,178 +0,0 @@
-package broker
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
- "io"
- "strings"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- var processedTsNs int64
- var messageCount int64
- subscriberId := in.Init.SubscriberId
-
- // TODO look it up
- topicConfig := &messaging_pb.TopicConfiguration{
- // IsTransient: true,
- }
-
- // get lock
- tp := TopicPartition{
- Namespace: in.Init.Namespace,
- Topic: in.Init.Topic,
- Partition: in.Init.Partition,
- }
- fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
- defer func() {
- fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
- }()
-
- lock := broker.topicManager.RequestLock(tp, topicConfig, false)
- defer broker.topicManager.ReleaseLock(tp, false)
-
- isConnected := true
- go func() {
- for isConnected {
- if _, err := stream.Recv(); err != nil {
- // println("disconnecting connection to", subscriberId, tp.String())
- isConnected = false
- lock.cond.Signal()
- }
- }
- }()
-
- lastReadTime := time.Now()
- switch in.Init.StartPosition {
- case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
- lastReadTime = time.Unix(0, in.Init.TimestampNs)
- case messaging_pb.SubscriberMessage_InitMessage_LATEST:
- case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
- lastReadTime = time.Unix(0, 0)
- }
-
- // how to process each message
- // an error returned will end the subscription
- eachMessageFn := func(m *messaging_pb.Message) error {
- err := stream.Send(&messaging_pb.BrokerMessage{
- Data: m,
- })
- if err != nil {
- glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
- }
- return err
- }
-
- eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
- m := &messaging_pb.Message{}
- if err = proto.Unmarshal(logEntry.Data, m); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
- return err
- }
- // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
- if err = eachMessageFn(m); err != nil {
- glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
- return err
- }
- if m.IsClose {
- // println("processed EOF")
- return io.EOF
- }
- processedTsNs = logEntry.TsNs
- messageCount++
- return nil
- }
-
- // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
-
- for {
-
- if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
- if err != io.EOF {
- // println("stopping from persisted logs", err.Error())
- return err
- }
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
-
- lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool {
- lock.Mutex.Lock()
- lock.cond.Wait()
- lock.Mutex.Unlock()
- return isConnected
- }, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
- continue
- }
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- if err != log_buffer.ResumeError {
- break
- }
- }
- }
-
- return err
-
-}
-
-func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
- startTime = startTime.UTC()
- startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
-
- sizeBuf := make([]byte, 4)
- startTsNs := startTime.UnixNano()
-
- topicDir := genTopicDir(tp.Namespace, tp.Topic)
- partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
-
- return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
- dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
- return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
- if dayEntry.Name == startDate {
- hourMinute := util.FileNameBase(hourMinuteEntry.Name)
- if strings.Compare(hourMinute, startHourMinute) < 0 {
- return nil
- }
- }
- if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
- return nil
- }
- // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
- chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
- defer chunkedFileReader.Close()
- if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil {
- chunkedFileReader.Close()
- if err == io.EOF {
- return err
- }
- return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
- }
- return nil
- }, "", false, 24*60)
- }, startDate, true, 366)
-
-}
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
deleted file mode 100644
index acf2d6d34..000000000
--- a/weed/messaging/broker/broker_server.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package broker
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "time"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
-)
-
-type MessageBrokerOption struct {
- Filers []pb.ServerAddress
- DefaultReplication string
- MaxMB int
- Ip string
- Port int
- Cipher bool
-}
-
-type MessageBroker struct {
- messaging_pb.UnimplementedSeaweedMessagingServer
- option *MessageBrokerOption
- grpcDialOption grpc.DialOption
- topicManager *TopicManager
-}
-
-func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
-
- messageBroker = &MessageBroker{
- option: option,
- grpcDialOption: grpcDialOption,
- }
-
- messageBroker.topicManager = NewTopicManager(messageBroker)
-
- messageBroker.checkFilers()
-
- go messageBroker.keepConnectedToOneFiler()
-
- return messageBroker, nil
-}
-
-func (broker *MessageBroker) keepConnectedToOneFiler() {
-
- for {
- for _, filer := range broker.option.Filers {
- broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.KeepConnected(ctx)
- if err != nil {
- glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- initRequest := &filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }
- for _, tp := range broker.topicManager.ListTopicPartitions() {
- initRequest.Resources = append(initRequest.Resources, tp.String())
- }
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- // TODO send events of adding/removing topics
-
- glog.V(0).Infof("conntected with filer: %v", filer)
- for {
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("send heartbeat")
- if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("received reply")
- time.Sleep(11 * time.Second)
- // println("woke up")
- }
- return nil
- })
- time.Sleep(3 * time.Second)
- }
- }
-
-}
-
-func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
-
-}
-
-func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
-
- return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
- return fn(client)
- })
-
-}
diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go
deleted file mode 100644
index 465a2a8f2..000000000
--- a/weed/messaging/broker/consistent_distribution.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package broker
-
-import (
- "github.com/buraksezer/consistent"
- "github.com/cespare/xxhash"
-)
-
-type Member string
-
-func (m Member) String() string {
- return string(m)
-}
-
-type hasher struct{}
-
-func (h hasher) Sum64(data []byte) uint64 {
- return xxhash.Sum64(data)
-}
-
-func PickMember(members []string, key []byte) string {
- cfg := consistent.Config{
- PartitionCount: 9791,
- ReplicationFactor: 2,
- Load: 1.25,
- Hasher: hasher{},
- }
-
- cmembers := []consistent.Member{}
- for _, m := range members {
- cmembers = append(cmembers, Member(m))
- }
-
- c := consistent.New(cmembers, cfg)
-
- m := c.LocateKey(key)
-
- return m.String()
-}
diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go
deleted file mode 100644
index f58fe4e0e..000000000
--- a/weed/messaging/broker/consistent_distribution_test.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package broker
-
-import (
- "fmt"
- "testing"
-)
-
-func TestPickMember(t *testing.T) {
-
- servers := []string{
- "s1:port",
- "s2:port",
- "s3:port",
- "s5:port",
- "s4:port",
- }
-
- total := 1000
-
- distribution := make(map[string]int)
- for i := 0; i < total; i++ {
- tp := fmt.Sprintf("tp:%2d", i)
- m := PickMember(servers, []byte(tp))
- // println(tp, "=>", m)
- distribution[m]++
- }
-
- for member, count := range distribution {
- fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
- }
-
-}
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
deleted file mode 100644
index c303c29b3..000000000
--- a/weed/messaging/broker/topic_manager.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package broker
-
-import (
- "fmt"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
-)
-
-type TopicPartition struct {
- Namespace string
- Topic string
- Partition int32
-}
-
-const (
- TopicPartitionFmt = "%s/%s_%02d"
-)
-
-func (tp *TopicPartition) String() string {
- return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
-}
-
-type TopicControl struct {
- sync.Mutex
- cond *sync.Cond
- subscriberCount int
- publisherCount int
- logBuffer *log_buffer.LogBuffer
-}
-
-type TopicManager struct {
- sync.Mutex
- topicControls map[TopicPartition]*TopicControl
- broker *MessageBroker
-}
-
-func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
- return &TopicManager{
- topicControls: make(map[TopicPartition]*TopicControl),
- broker: messageBroker,
- }
-}
-
-func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
-
- flushFn := func(startTime, stopTime time.Time, buf []byte) {
-
- if topicConfig.IsTransient {
- // return
- }
-
- // fmt.Printf("flushing with topic config %+v\n", topicConfig)
-
- startTime, stopTime = startTime.UTC(), stopTime.UTC()
- targetFile := fmt.Sprintf(
- "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
- filer.TopicsDir, tp.Namespace, tp.Topic,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- tp.Partition,
- )
-
- if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
- }
- logBuffer := log_buffer.NewLogBuffer("broker", time.Minute, flushFn, func() {
- tl.cond.Broadcast()
- })
-
- return logBuffer
-}
-
-func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
- tm.Lock()
- defer tm.Unlock()
-
- tc, found := tm.topicControls[partition]
- if !found {
- tc = &TopicControl{}
- tc.cond = sync.NewCond(&tc.Mutex)
- tm.topicControls[partition] = tc
- tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
- }
- if isPublisher {
- tc.publisherCount++
- } else {
- tc.subscriberCount++
- }
- return tc
-}
-
-func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
- tm.Lock()
- defer tm.Unlock()
-
- lock, found := tm.topicControls[partition]
- if !found {
- return
- }
- if isPublisher {
- lock.publisherCount--
- } else {
- lock.subscriberCount--
- }
- if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
- delete(tm.topicControls, partition)
- lock.logBuffer.Shutdown()
- }
-}
-
-func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
- tm.Lock()
- defer tm.Unlock()
-
- for k := range tm.topicControls {
- tps = append(tps, k)
- }
- return
-}
diff --git a/weed/messaging/msgclient/chan_config.go b/weed/messaging/msgclient/chan_config.go
deleted file mode 100644
index a75678815..000000000
--- a/weed/messaging/msgclient/chan_config.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package msgclient
-
-func (mc *MessagingClient) DeleteChannel(chanName string) error {
- return mc.DeleteTopic("chan", chanName)
-}
diff --git a/weed/messaging/msgclient/chan_pub.go b/weed/messaging/msgclient/chan_pub.go
deleted file mode 100644
index 9bc88f7c0..000000000
--- a/weed/messaging/msgclient/chan_pub.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package msgclient
-
-import (
- "crypto/md5"
- "hash"
- "io"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-type PubChannel struct {
- client messaging_pb.SeaweedMessaging_PublishClient
- grpcConnection *grpc.ClientConn
- md5hash hash.Hash
-}
-
-func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
- tp := broker.TopicPartition{
- Namespace: "chan",
- Topic: chanName,
- Partition: 0,
- }
- grpcConnection, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- pc, err := setupPublisherClient(grpcConnection, tp)
- if err != nil {
- return nil, err
- }
- return &PubChannel{
- client: pc,
- grpcConnection: grpcConnection,
- md5hash: md5.New(),
- }, nil
-}
-
-func (pc *PubChannel) Publish(m []byte) error {
- err := pc.client.Send(&messaging_pb.PublishRequest{
- Data: &messaging_pb.Message{
- Value: m,
- },
- })
- if err == nil {
- pc.md5hash.Write(m)
- }
- return err
-}
-func (pc *PubChannel) Close() error {
-
- // println("send closing")
- if err := pc.client.Send(&messaging_pb.PublishRequest{
- Data: &messaging_pb.Message{
- IsClose: true,
- },
- }); err != nil {
- log.Printf("err send close: %v", err)
- }
- // println("receive closing")
- if _, err := pc.client.Recv(); err != nil && err != io.EOF {
- log.Printf("err receive close: %v", err)
- }
- // println("close connection")
- if err := pc.grpcConnection.Close(); err != nil {
- log.Printf("err connection close: %v", err)
- }
- return nil
-}
-
-func (pc *PubChannel) Md5() []byte {
- return pc.md5hash.Sum(nil)
-}
diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/messaging/msgclient/chan_sub.go
deleted file mode 100644
index 213ff4666..000000000
--- a/weed/messaging/msgclient/chan_sub.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package msgclient
-
-import (
- "context"
- "crypto/md5"
- "hash"
- "io"
- "log"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-type SubChannel struct {
- ch chan []byte
- stream messaging_pb.SeaweedMessaging_SubscribeClient
- md5hash hash.Hash
- cancel context.CancelFunc
-}
-
-func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
- tp := broker.TopicPartition{
- Namespace: "chan",
- Topic: chanName,
- Partition: 0,
- }
- grpcConnection, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
- sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
- if err != nil {
- return nil, err
- }
-
- t := &SubChannel{
- ch: make(chan []byte),
- stream: sc,
- md5hash: md5.New(),
- cancel: cancel,
- }
-
- go func() {
- for {
- resp, subErr := t.stream.Recv()
- if subErr == io.EOF {
- return
- }
- if subErr != nil {
- log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
- return
- }
- if resp.Data == nil {
- // this could be heartbeat from broker
- continue
- }
- if resp.Data.IsClose {
- t.stream.Send(&messaging_pb.SubscriberMessage{
- IsClose: true,
- })
- close(t.ch)
- cancel()
- return
- }
- t.ch <- resp.Data.Value
- t.md5hash.Write(resp.Data.Value)
- }
- }()
-
- return t, nil
-}
-
-func (sc *SubChannel) Channel() chan []byte {
- return sc.ch
-}
-
-func (sc *SubChannel) Md5() []byte {
- return sc.md5hash.Sum(nil)
-}
-
-func (sc *SubChannel) Cancel() {
- sc.cancel()
-}
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go
deleted file mode 100644
index 4d7ef2b8e..000000000
--- a/weed/messaging/msgclient/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package msgclient
-
-import (
- "context"
- "fmt"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnections map[broker.TopicPartition]*grpc.ClientConn
- grpcDialOption grpc.DialOption
-}
-
-func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
- }
-}
-
-func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
-
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
- &messaging_pb.FindBrokerRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Parition: tp.Partition,
- })
- if err != nil {
- return nil, err
- }
-
- targetBroker := resp.Broker
- return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
- }
- return nil, fmt.Errorf("no broker found for %+v", tp)
-}
diff --git a/weed/messaging/msgclient/config.go b/weed/messaging/msgclient/config.go
deleted file mode 100644
index 2b9eba1a8..000000000
--- a/weed/messaging/msgclient/config.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package msgclient
-
-import (
- "context"
- "log"
-
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
-
- return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
- _, err := client.ConfigureTopic(context.Background(),
- &messaging_pb.ConfigureTopicRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Configuration: &messaging_pb.TopicConfiguration{
- PartitionCount: 0,
- Collection: "",
- Replication: "",
- IsTransient: false,
- Partitoning: 0,
- },
- })
- return err
- })
-
-}
-
-func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
-
- return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
- _, err := client.DeleteTopic(context.Background(),
- &messaging_pb.DeleteTopicRequest{
- Namespace: namespace,
- Topic: topic,
- })
- return err
- })
-}
-
-func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error {
-
- var lastErr error
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection))
- if err == nil {
- return nil
- }
- lastErr = err
- }
-
- return lastErr
-}
diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go
deleted file mode 100644
index 1aa483ff8..000000000
--- a/weed/messaging/msgclient/publisher.go
+++ /dev/null
@@ -1,118 +0,0 @@
-package msgclient
-
-import (
- "context"
-
- "github.com/OneOfOne/xxhash"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-type Publisher struct {
- publishClients []messaging_pb.SeaweedMessaging_PublishClient
- topicConfiguration *messaging_pb.TopicConfiguration
- messageCount uint64
- publisherId string
-}
-
-func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
- // read topic configuration
- topicConfiguration := &messaging_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- tp := broker.TopicPartition{
- Namespace: namespace,
- Topic: topic,
- Partition: int32(i),
- }
- grpcClientConn, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- client, err := setupPublisherClient(grpcClientConn, tp)
- if err != nil {
- return nil, err
- }
- publishClients[i] = client
- }
- return &Publisher{
- publishClients: publishClients,
- topicConfiguration: topicConfiguration,
- }, nil
-}
-
-func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
-
- stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
- if err != nil {
- return nil, err
- }
-
- // send init message
- err = stream.Send(&messaging_pb.PublishRequest{
- Init: &messaging_pb.PublishRequest_InitMessage{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Partition: tp.Partition,
- },
- })
- if err != nil {
- return nil, err
- }
-
- // process init response
- initResponse, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
- if initResponse.Config != nil {
- }
-
- // setup looks for control messages
- doneChan := make(chan error, 1)
- go func() {
- for {
- in, err := stream.Recv()
- if err != nil {
- doneChan <- err
- return
- }
- if in.Redirect != nil {
- }
- if in.Config != nil {
- }
- }
- }()
-
- return stream, nil
-
-}
-
-func (p *Publisher) Publish(m *messaging_pb.Message) error {
- hashValue := p.messageCount
- p.messageCount++
- if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
- if m.Key != nil {
- hashValue = xxhash.Checksum64(m.Key)
- }
- } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
- hashValue = xxhash.Checksum64(m.Key)
- } else {
- // round robin
- }
-
- idx := int(hashValue) % len(p.publishClients)
- if idx < 0 {
- idx += len(p.publishClients)
- }
- return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
- Data: m,
- })
-}
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
deleted file mode 100644
index 6c7dc1ab7..000000000
--- a/weed/messaging/msgclient/subscriber.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package msgclient
-
-import (
- "context"
- "io"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "google.golang.org/grpc"
-)
-
-type Subscriber struct {
- subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
- subscriberCancels []context.CancelFunc
- subscriberId string
-}
-
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
- // read topic configuration
- topicConfiguration := &messaging_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
- subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
-
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- if partitionId >= 0 && i != partitionId {
- continue
- }
- tp := broker.TopicPartition{
- Namespace: namespace,
- Topic: topic,
- Partition: int32(i),
- }
- grpcClientConn, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
- client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
- if err != nil {
- return nil, err
- }
- subscriberClients[i] = client
- subscriberCancels[i] = cancel
- }
-
- return &Subscriber{
- subscriberClients: subscriberClients,
- subscriberCancels: subscriberCancels,
- subscriberId: subscriberId,
- }, nil
-}
-
-func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
- stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
- if err != nil {
- return
- }
-
- // send init message
- err = stream.Send(&messaging_pb.SubscriberMessage{
- Init: &messaging_pb.SubscriberMessage_InitMessage{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Partition: tp.Partition,
- StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
- TimestampNs: startTime.UnixNano(),
- SubscriberId: subscriberId,
- },
- })
- if err != nil {
- return
- }
-
- return stream, nil
-}
-
-func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
- for {
- resp, listenErr := subscriberClient.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- println(listenErr.Error())
- return listenErr
- }
- if resp.Data == nil {
- // this could be heartbeat from broker
- continue
- }
- processFn(resp.Data)
- }
-}
-
-// Subscribe starts goroutines to process the messages
-func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
- var wg sync.WaitGroup
- for i := 0; i < len(s.subscriberClients); i++ {
- if s.subscriberClients[i] != nil {
- wg.Add(1)
- go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) {
- defer wg.Done()
- doSubscribe(subscriberClient, processFn)
- }(s.subscriberClients[i])
- }
- }
- wg.Wait()
-}
-
-func (s *Subscriber) Shutdown() {
- for i := 0; i < len(s.subscriberClients); i++ {
- if s.subscriberCancels[i] != nil {
- s.subscriberCancels[i]()
- }
- }
-}