aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker')
-rw-r--r--weed/messaging/broker/broker_append.go130
-rw-r--r--weed/messaging/broker/broker_grpc_server.go37
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go122
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go112
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go178
-rw-r--r--weed/messaging/broker/broker_server.go116
-rw-r--r--weed/messaging/broker/consistent_distribution.go38
-rw-r--r--weed/messaging/broker/consistent_distribution_test.go32
-rw-r--r--weed/messaging/broker/topic_manager.go124
9 files changed, 0 insertions, 889 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
deleted file mode 100644
index 9a31a8ac0..000000000
--- a/weed/messaging/broker/broker_append.go
+++ /dev/null
@@ -1,130 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
-
- assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
- if err2 != nil {
- return err2
- }
-
- dir, name := util.FullPath(targetFile).DirAndName()
-
- // append the chunk
- if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AppendToEntryRequest{
- Directory: dir,
- EntryName: name,
- Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
- }
-
- _, err := client.AppendToEntry(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("append to file %v: %v", request, err)
- return err
- }
-
- return nil
- }); err != nil {
- return fmt.Errorf("append to file %v: %v", targetFile, err)
- }
-
- return nil
-}
-
-func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
-
- var assignResult = &operation.AssignResult{}
-
- // assign a volume location
- if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
-
- assignErr := util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: topicConfig.Replication,
- Collection: topicConfig.Collection,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- assignResult.Auth = security.EncodedJwt(resp.Auth)
- assignResult.Fid = resp.FileId
- assignResult.Url = resp.Location.Url
- assignResult.PublicUrl = resp.Location.PublicUrl
- assignResult.GrpcPort = int(resp.Location.GrpcPort)
- assignResult.Count = uint64(resp.Count)
-
- return nil
- })
- if assignErr != nil {
- return assignErr
- }
-
- return nil
- }); err != nil {
- return nil, nil, err
- }
-
- // upload data
- targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
- uploadOption := &operation.UploadOption{
- UploadUrl: targetUrl,
- Filename: "",
- Cipher: broker.option.Cipher,
- IsInputCompressed: false,
- MimeType: "",
- PairMap: nil,
- Jwt: assignResult.Auth,
- }
- uploadResult, err := operation.UploadData(data, uploadOption)
- if err != nil {
- return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
- }
- // println("uploaded to", targetUrl)
- return assignResult, uploadResult, nil
-}
-
-var _ = filer_pb.FilerClient(&MessageBroker{})
-
-func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
-
- for _, filer := range broker.option.Filers {
- if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
- if err == io.EOF {
- return
- }
- glog.V(0).Infof("fail to connect to %s: %v", filer, err)
- } else {
- break
- }
- }
-
- return
-
-}
-
-func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
-}
diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go
deleted file mode 100644
index ba141fdd0..000000000
--- a/weed/messaging/broker/broker_grpc_server.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
- panic("implement me")
-}
-
-func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
- resp := &messaging_pb.DeleteTopicResponse{}
- dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
- if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
- return nil, err
- } else if exists {
- err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil)
- }
- return resp, nil
-}
-
-func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
- panic("implement me")
-}
-
-func genTopicDir(namespace, topic string) string {
- return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic)
-}
-
-func genTopicDirEntry(namespace, topic string) (dir, entry string) {
- return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic
-}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
deleted file mode 100644
index 5cd8edd33..000000000
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ /dev/null
@@ -1,122 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/cluster"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-/*
-Topic discovery:
-
-When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
-
-The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
-Otherwise, just host the topic.
-
-So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
-If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
-
-*/
-
-func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
-
- t := &messaging_pb.FindBrokerResponse{}
- var peers []string
-
- targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
-
- for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
- Resource: targetTopicPartition,
- })
- if err != nil {
- return err
- }
- if resp.Found && len(resp.Resources) > 0 {
- t.Broker = resp.Resources[0].GrpcAddresses
- return nil
- }
- for _, b := range resp.Resources {
- peers = append(peers, b.GrpcAddresses)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- }
-
- t.Broker = PickMember(peers, []byte(targetTopicPartition))
-
- return t, nil
-
-}
-
-func (broker *MessageBroker) checkFilers() {
-
- // contact a filer about masters
- var masters []pb.ServerAddress
- found := false
- for !found {
- for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return err
- }
- for _, m := range resp.Masters {
- masters = append(masters, pb.ServerAddress(m))
- }
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received master list: %s", masters)
-
- // contact each masters for filers
- var filers []pb.ServerAddress
- found = false
- for !found {
- for _, master := range masters {
- err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
- ClientType: cluster.FilerType,
- })
- if err != nil {
- return err
- }
-
- for _, clusterNode := range resp.ClusterNodes {
- filers = append(filers, pb.ServerAddress(clusterNode.Address))
- }
-
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to list filers: %v", err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received filer list: %s", filers)
-
- broker.option.Filers = filers
-
-}
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
deleted file mode 100644
index 6e6b723d1..000000000
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ /dev/null
@@ -1,112 +0,0 @@
-package broker
-
-import (
- "crypto/md5"
- "fmt"
- "io"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- // TODO look it up
- topicConfig := &messaging_pb.TopicConfiguration{
- // IsTransient: true,
- }
-
- // send init response
- initResponse := &messaging_pb.PublishResponse{
- Config: nil,
- Redirect: nil,
- }
- err = stream.Send(initResponse)
- if err != nil {
- return err
- }
- if initResponse.Redirect != nil {
- return nil
- }
-
- // get lock
- tp := TopicPartition{
- Namespace: in.Init.Namespace,
- Topic: in.Init.Topic,
- Partition: in.Init.Partition,
- }
-
- tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
- md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
- // println("chan data stored under", tpDir, "as", md5File)
-
- if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
- return fmt.Errorf("channel is already closed")
- }
-
- tl := broker.topicManager.RequestLock(tp, topicConfig, true)
- defer broker.topicManager.ReleaseLock(tp, true)
-
- md5hash := md5.New()
- // process each message
- for {
- // println("recv")
- in, err := stream.Recv()
- // glog.V(0).Infof("recieved %v err: %v", in, err)
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- if in.Data == nil {
- continue
- }
-
- // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
-
- data, err := proto.Marshal(in.Data)
- if err != nil {
- glog.Errorf("marshall error: %v\n", err)
- continue
- }
-
- tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
-
- if in.Data.IsClose {
- // println("server received closing")
- break
- }
-
- md5hash.Write(in.Data.Value)
-
- }
-
- if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
- glog.V(0).Infof("err writing %s: %v", md5File, err)
- }
-
- // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
-
- // send the close ack
- // println("server send ack closing")
- if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
- glog.V(0).Infof("err sending close response: %v", err)
- }
- return nil
-
-}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
deleted file mode 100644
index 20d529239..000000000
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ /dev/null
@@ -1,178 +0,0 @@
-package broker
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
- "io"
- "strings"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- var processedTsNs int64
- var messageCount int64
- subscriberId := in.Init.SubscriberId
-
- // TODO look it up
- topicConfig := &messaging_pb.TopicConfiguration{
- // IsTransient: true,
- }
-
- // get lock
- tp := TopicPartition{
- Namespace: in.Init.Namespace,
- Topic: in.Init.Topic,
- Partition: in.Init.Partition,
- }
- fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
- defer func() {
- fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
- }()
-
- lock := broker.topicManager.RequestLock(tp, topicConfig, false)
- defer broker.topicManager.ReleaseLock(tp, false)
-
- isConnected := true
- go func() {
- for isConnected {
- if _, err := stream.Recv(); err != nil {
- // println("disconnecting connection to", subscriberId, tp.String())
- isConnected = false
- lock.cond.Signal()
- }
- }
- }()
-
- lastReadTime := time.Now()
- switch in.Init.StartPosition {
- case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
- lastReadTime = time.Unix(0, in.Init.TimestampNs)
- case messaging_pb.SubscriberMessage_InitMessage_LATEST:
- case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
- lastReadTime = time.Unix(0, 0)
- }
-
- // how to process each message
- // an error returned will end the subscription
- eachMessageFn := func(m *messaging_pb.Message) error {
- err := stream.Send(&messaging_pb.BrokerMessage{
- Data: m,
- })
- if err != nil {
- glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
- }
- return err
- }
-
- eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
- m := &messaging_pb.Message{}
- if err = proto.Unmarshal(logEntry.Data, m); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
- return err
- }
- // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
- if err = eachMessageFn(m); err != nil {
- glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
- return err
- }
- if m.IsClose {
- // println("processed EOF")
- return io.EOF
- }
- processedTsNs = logEntry.TsNs
- messageCount++
- return nil
- }
-
- // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
-
- for {
-
- if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
- if err != io.EOF {
- // println("stopping from persisted logs", err.Error())
- return err
- }
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
-
- lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool {
- lock.Mutex.Lock()
- lock.cond.Wait()
- lock.Mutex.Unlock()
- return isConnected
- }, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
- continue
- }
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- if err != log_buffer.ResumeError {
- break
- }
- }
- }
-
- return err
-
-}
-
-func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
- startTime = startTime.UTC()
- startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
-
- sizeBuf := make([]byte, 4)
- startTsNs := startTime.UnixNano()
-
- topicDir := genTopicDir(tp.Namespace, tp.Topic)
- partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
-
- return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
- dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
- return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
- if dayEntry.Name == startDate {
- hourMinute := util.FileNameBase(hourMinuteEntry.Name)
- if strings.Compare(hourMinute, startHourMinute) < 0 {
- return nil
- }
- }
- if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
- return nil
- }
- // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
- chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
- defer chunkedFileReader.Close()
- if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil {
- chunkedFileReader.Close()
- if err == io.EOF {
- return err
- }
- return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
- }
- return nil
- }, "", false, 24*60)
- }, startDate, true, 366)
-
-}
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
deleted file mode 100644
index acf2d6d34..000000000
--- a/weed/messaging/broker/broker_server.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package broker
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "time"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
-)
-
-type MessageBrokerOption struct {
- Filers []pb.ServerAddress
- DefaultReplication string
- MaxMB int
- Ip string
- Port int
- Cipher bool
-}
-
-type MessageBroker struct {
- messaging_pb.UnimplementedSeaweedMessagingServer
- option *MessageBrokerOption
- grpcDialOption grpc.DialOption
- topicManager *TopicManager
-}
-
-func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
-
- messageBroker = &MessageBroker{
- option: option,
- grpcDialOption: grpcDialOption,
- }
-
- messageBroker.topicManager = NewTopicManager(messageBroker)
-
- messageBroker.checkFilers()
-
- go messageBroker.keepConnectedToOneFiler()
-
- return messageBroker, nil
-}
-
-func (broker *MessageBroker) keepConnectedToOneFiler() {
-
- for {
- for _, filer := range broker.option.Filers {
- broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.KeepConnected(ctx)
- if err != nil {
- glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- initRequest := &filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }
- for _, tp := range broker.topicManager.ListTopicPartitions() {
- initRequest.Resources = append(initRequest.Resources, tp.String())
- }
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
-
- // TODO send events of adding/removing topics
-
- glog.V(0).Infof("conntected with filer: %v", filer)
- for {
- if err := stream.Send(&filer_pb.KeepConnectedRequest{
- Name: broker.option.Ip,
- GrpcPort: uint32(broker.option.Port),
- }); err != nil {
- glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("send heartbeat")
- if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
- return err
- }
- // println("received reply")
- time.Sleep(11 * time.Second)
- // println("woke up")
- }
- return nil
- })
- time.Sleep(3 * time.Second)
- }
- }
-
-}
-
-func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
-
-}
-
-func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
-
- return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
- return fn(client)
- })
-
-}
diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go
deleted file mode 100644
index 465a2a8f2..000000000
--- a/weed/messaging/broker/consistent_distribution.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package broker
-
-import (
- "github.com/buraksezer/consistent"
- "github.com/cespare/xxhash"
-)
-
-type Member string
-
-func (m Member) String() string {
- return string(m)
-}
-
-type hasher struct{}
-
-func (h hasher) Sum64(data []byte) uint64 {
- return xxhash.Sum64(data)
-}
-
-func PickMember(members []string, key []byte) string {
- cfg := consistent.Config{
- PartitionCount: 9791,
- ReplicationFactor: 2,
- Load: 1.25,
- Hasher: hasher{},
- }
-
- cmembers := []consistent.Member{}
- for _, m := range members {
- cmembers = append(cmembers, Member(m))
- }
-
- c := consistent.New(cmembers, cfg)
-
- m := c.LocateKey(key)
-
- return m.String()
-}
diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go
deleted file mode 100644
index f58fe4e0e..000000000
--- a/weed/messaging/broker/consistent_distribution_test.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package broker
-
-import (
- "fmt"
- "testing"
-)
-
-func TestPickMember(t *testing.T) {
-
- servers := []string{
- "s1:port",
- "s2:port",
- "s3:port",
- "s5:port",
- "s4:port",
- }
-
- total := 1000
-
- distribution := make(map[string]int)
- for i := 0; i < total; i++ {
- tp := fmt.Sprintf("tp:%2d", i)
- m := PickMember(servers, []byte(tp))
- // println(tp, "=>", m)
- distribution[m]++
- }
-
- for member, count := range distribution {
- fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
- }
-
-}
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
deleted file mode 100644
index c303c29b3..000000000
--- a/weed/messaging/broker/topic_manager.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package broker
-
-import (
- "fmt"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
-)
-
-type TopicPartition struct {
- Namespace string
- Topic string
- Partition int32
-}
-
-const (
- TopicPartitionFmt = "%s/%s_%02d"
-)
-
-func (tp *TopicPartition) String() string {
- return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
-}
-
-type TopicControl struct {
- sync.Mutex
- cond *sync.Cond
- subscriberCount int
- publisherCount int
- logBuffer *log_buffer.LogBuffer
-}
-
-type TopicManager struct {
- sync.Mutex
- topicControls map[TopicPartition]*TopicControl
- broker *MessageBroker
-}
-
-func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
- return &TopicManager{
- topicControls: make(map[TopicPartition]*TopicControl),
- broker: messageBroker,
- }
-}
-
-func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
-
- flushFn := func(startTime, stopTime time.Time, buf []byte) {
-
- if topicConfig.IsTransient {
- // return
- }
-
- // fmt.Printf("flushing with topic config %+v\n", topicConfig)
-
- startTime, stopTime = startTime.UTC(), stopTime.UTC()
- targetFile := fmt.Sprintf(
- "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
- filer.TopicsDir, tp.Namespace, tp.Topic,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- tp.Partition,
- )
-
- if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
- }
- logBuffer := log_buffer.NewLogBuffer("broker", time.Minute, flushFn, func() {
- tl.cond.Broadcast()
- })
-
- return logBuffer
-}
-
-func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
- tm.Lock()
- defer tm.Unlock()
-
- tc, found := tm.topicControls[partition]
- if !found {
- tc = &TopicControl{}
- tc.cond = sync.NewCond(&tc.Mutex)
- tm.topicControls[partition] = tc
- tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
- }
- if isPublisher {
- tc.publisherCount++
- } else {
- tc.subscriberCount++
- }
- return tc
-}
-
-func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
- tm.Lock()
- defer tm.Unlock()
-
- lock, found := tm.topicControls[partition]
- if !found {
- return
- }
- if isPublisher {
- lock.publisherCount--
- } else {
- lock.subscriberCount--
- }
- if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
- delete(tm.topicControls, partition)
- lock.logBuffer.Shutdown()
- }
-}
-
-func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
- tm.Lock()
- defer tm.Unlock()
-
- for k := range tm.topicControls {
- tps = append(tps, k)
- }
- return
-}