diff options
Diffstat (limited to 'weed/messaging/broker')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 130 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server.go | 37 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_discovery.go | 122 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 112 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 178 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 116 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution.go | 38 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution_test.go | 32 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_manager.go | 124 |
9 files changed, 0 insertions, 889 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go deleted file mode 100644 index 9a31a8ac0..000000000 --- a/weed/messaging/broker/broker_append.go +++ /dev/null @@ -1,130 +0,0 @@ -package broker - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "io" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error { - - assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) - if err2 != nil { - return err2 - } - - dir, name := util.FullPath(targetFile).DirAndName() - - // append the chunk - if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.AppendToEntryRequest{ - Directory: dir, - EntryName: name, - Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)}, - } - - _, err := client.AppendToEntry(context.Background(), request) - if err != nil { - glog.V(0).Infof("append to file %v: %v", request, err) - return err - } - - return nil - }); err != nil { - return fmt.Errorf("append to file %v: %v", targetFile, err) - } - - return nil -} - -func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { - - var assignResult = &operation.AssignResult{} - - // assign a volume location - if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - assignErr := util.Retry("assignVolume", func() error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: topicConfig.Replication, - Collection: topicConfig.Collection, - } - - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - assignResult.Auth = security.EncodedJwt(resp.Auth) - assignResult.Fid = resp.FileId - assignResult.Url = resp.Location.Url - assignResult.PublicUrl = resp.Location.PublicUrl - assignResult.GrpcPort = int(resp.Location.GrpcPort) - assignResult.Count = uint64(resp.Count) - - return nil - }) - if assignErr != nil { - return assignErr - } - - return nil - }); err != nil { - return nil, nil, err - } - - // upload data - targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) - uploadOption := &operation.UploadOption{ - UploadUrl: targetUrl, - Filename: "", - Cipher: broker.option.Cipher, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - Jwt: assignResult.Auth, - } - uploadResult, err := operation.UploadData(data, uploadOption) - if err != nil { - return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) - } - // println("uploaded to", targetUrl) - return assignResult, uploadResult, nil -} - -var _ = filer_pb.FilerClient(&MessageBroker{}) - -func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { - - for _, filer := range broker.option.Filers { - if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { - if err == io.EOF { - return - } - glog.V(0).Infof("fail to connect to %s: %v", filer, err) - } else { - break - } - } - - return - -} - -func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go deleted file mode 100644 index ba141fdd0..000000000 --- a/weed/messaging/broker/broker_grpc_server.go +++ /dev/null @@ -1,37 +0,0 @@ -package broker - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) { - panic("implement me") -} - -func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) { - resp := &messaging_pb.DeleteTopicResponse{} - dir, entry := genTopicDirEntry(request.Namespace, request.Topic) - if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { - return nil, err - } else if exists { - err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil) - } - return resp, nil -} - -func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { - panic("implement me") -} - -func genTopicDir(namespace, topic string) string { - return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic) -} - -func genTopicDirEntry(namespace, topic string) (dir, entry string) { - return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic -} diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go deleted file mode 100644 index 5cd8edd33..000000000 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ /dev/null @@ -1,122 +0,0 @@ -package broker - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/cluster" - "github.com/chrislusf/seaweedfs/weed/pb" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -/* -Topic discovery: - -When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker. - -The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it. -Otherwise, just host the topic. - -So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy. -If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help. - -*/ - -func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) { - - t := &messaging_pb.FindBrokerResponse{} - var peers []string - - targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) - - for _, filer := range broker.option.Filers { - err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ - Resource: targetTopicPartition, - }) - if err != nil { - return err - } - if resp.Found && len(resp.Resources) > 0 { - t.Broker = resp.Resources[0].GrpcAddresses - return nil - } - for _, b := range resp.Resources { - peers = append(peers, b.GrpcAddresses) - } - return nil - }) - if err != nil { - return nil, err - } - } - - t.Broker = PickMember(peers, []byte(targetTopicPartition)) - - return t, nil - -} - -func (broker *MessageBroker) checkFilers() { - - // contact a filer about masters - var masters []pb.ServerAddress - found := false - for !found { - for _, filer := range broker.option.Filers { - err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err - } - for _, m := range resp.Masters { - masters = append(masters, pb.ServerAddress(m)) - } - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received master list: %s", masters) - - // contact each masters for filers - var filers []pb.ServerAddress - found = false - for !found { - for _, master := range masters { - err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error { - resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: cluster.FilerType, - }) - if err != nil { - return err - } - - for _, clusterNode := range resp.ClusterNodes { - filers = append(filers, pb.ServerAddress(clusterNode.Address)) - } - - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to list filers: %v", err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received filer list: %s", filers) - - broker.option.Filers = filers - -} diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go deleted file mode 100644 index 6e6b723d1..000000000 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ /dev/null @@ -1,112 +0,0 @@ -package broker - -import ( - "crypto/md5" - "fmt" - "io" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error { - - // process initial request - in, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - // TODO look it up - topicConfig := &messaging_pb.TopicConfiguration{ - // IsTransient: true, - } - - // send init response - initResponse := &messaging_pb.PublishResponse{ - Config: nil, - Redirect: nil, - } - err = stream.Send(initResponse) - if err != nil { - return err - } - if initResponse.Redirect != nil { - return nil - } - - // get lock - tp := TopicPartition{ - Namespace: in.Init.Namespace, - Topic: in.Init.Topic, - Partition: in.Init.Partition, - } - - tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic) - md5File := fmt.Sprintf("p%02d.md5", tp.Partition) - // println("chan data stored under", tpDir, "as", md5File) - - if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists { - return fmt.Errorf("channel is already closed") - } - - tl := broker.topicManager.RequestLock(tp, topicConfig, true) - defer broker.topicManager.ReleaseLock(tp, true) - - md5hash := md5.New() - // process each message - for { - // println("recv") - in, err := stream.Recv() - // glog.V(0).Infof("recieved %v err: %v", in, err) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - if in.Data == nil { - continue - } - - // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value)) - - data, err := proto.Marshal(in.Data) - if err != nil { - glog.Errorf("marshall error: %v\n", err) - continue - } - - tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs) - - if in.Data.IsClose { - // println("server received closing") - break - } - - md5hash.Write(in.Data.Value) - - } - - if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil { - glog.V(0).Infof("err writing %s: %v", md5File, err) - } - - // fmt.Printf("received md5 %X\n", md5hash.Sum(nil)) - - // send the close ack - // println("server send ack closing") - if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil { - glog.V(0).Infof("err sending close response: %v", err) - } - return nil - -} diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go deleted file mode 100644 index 20d529239..000000000 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ /dev/null @@ -1,178 +0,0 @@ -package broker - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" - "io" - "strings" - "time" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error { - - // process initial request - in, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - var processedTsNs int64 - var messageCount int64 - subscriberId := in.Init.SubscriberId - - // TODO look it up - topicConfig := &messaging_pb.TopicConfiguration{ - // IsTransient: true, - } - - // get lock - tp := TopicPartition{ - Namespace: in.Init.Namespace, - Topic: in.Init.Topic, - Partition: in.Init.Partition, - } - fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String()) - defer func() { - fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs)) - }() - - lock := broker.topicManager.RequestLock(tp, topicConfig, false) - defer broker.topicManager.ReleaseLock(tp, false) - - isConnected := true - go func() { - for isConnected { - if _, err := stream.Recv(); err != nil { - // println("disconnecting connection to", subscriberId, tp.String()) - isConnected = false - lock.cond.Signal() - } - } - }() - - lastReadTime := time.Now() - switch in.Init.StartPosition { - case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP: - lastReadTime = time.Unix(0, in.Init.TimestampNs) - case messaging_pb.SubscriberMessage_InitMessage_LATEST: - case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: - lastReadTime = time.Unix(0, 0) - } - - // how to process each message - // an error returned will end the subscription - eachMessageFn := func(m *messaging_pb.Message) error { - err := stream.Send(&messaging_pb.BrokerMessage{ - Data: m, - }) - if err != nil { - glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) - } - return err - } - - eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { - m := &messaging_pb.Message{} - if err = proto.Unmarshal(logEntry.Data, m); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) - return err - } - // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs) - if err = eachMessageFn(m); err != nil { - glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) - return err - } - if m.IsClose { - // println("processed EOF") - return io.EOF - } - processedTsNs = logEntry.TsNs - messageCount++ - return nil - } - - // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime) - - for { - - if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { - if err != io.EOF { - // println("stopping from persisted logs", err.Error()) - return err - } - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - - lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool { - lock.Mutex.Lock() - lock.cond.Wait() - lock.Mutex.Unlock() - return isConnected - }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { - continue - } - glog.Errorf("processed to %v: %v", lastReadTime, err) - if err != log_buffer.ResumeError { - break - } - } - } - - return err - -} - -func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { - startTime = startTime.UTC() - startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) - - sizeBuf := make([]byte, 4) - startTsNs := startTime.UnixNano() - - topicDir := genTopicDir(tp.Namespace, tp.Topic) - partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) - - return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { - dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) - return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { - if dayEntry.Name == startDate { - hourMinute := util.FileNameBase(hourMinuteEntry.Name) - if strings.Compare(hourMinute, startHourMinute) < 0 { - return nil - } - } - if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) { - return nil - } - // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) - chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) - defer chunkedFileReader.Close() - if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil { - chunkedFileReader.Close() - if err == io.EOF { - return err - } - return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) - } - return nil - }, "", false, 24*60) - }, startDate, true, 366) - -} diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go deleted file mode 100644 index acf2d6d34..000000000 --- a/weed/messaging/broker/broker_server.go +++ /dev/null @@ -1,116 +0,0 @@ -package broker - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "time" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" -) - -type MessageBrokerOption struct { - Filers []pb.ServerAddress - DefaultReplication string - MaxMB int - Ip string - Port int - Cipher bool -} - -type MessageBroker struct { - messaging_pb.UnimplementedSeaweedMessagingServer - option *MessageBrokerOption - grpcDialOption grpc.DialOption - topicManager *TopicManager -} - -func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { - - messageBroker = &MessageBroker{ - option: option, - grpcDialOption: grpcDialOption, - } - - messageBroker.topicManager = NewTopicManager(messageBroker) - - messageBroker.checkFilers() - - go messageBroker.keepConnectedToOneFiler() - - return messageBroker, nil -} - -func (broker *MessageBroker) keepConnectedToOneFiler() { - - for { - for _, filer := range broker.option.Filers { - broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.KeepConnected(ctx) - if err != nil { - glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - - initRequest := &filer_pb.KeepConnectedRequest{ - Name: broker.option.Ip, - GrpcPort: uint32(broker.option.Port), - } - for _, tp := range broker.topicManager.ListTopicPartitions() { - initRequest.Resources = append(initRequest.Resources, tp.String()) - } - if err := stream.Send(&filer_pb.KeepConnectedRequest{ - Name: broker.option.Ip, - GrpcPort: uint32(broker.option.Port), - }); err != nil { - glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - - // TODO send events of adding/removing topics - - glog.V(0).Infof("conntected with filer: %v", filer) - for { - if err := stream.Send(&filer_pb.KeepConnectedRequest{ - Name: broker.option.Ip, - GrpcPort: uint32(broker.option.Port), - }); err != nil { - glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - // println("send heartbeat") - if _, err := stream.Recv(); err != nil { - glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - // println("received reply") - time.Sleep(11 * time.Second) - // println("woke up") - } - return nil - }) - time.Sleep(3 * time.Second) - } - } - -} - -func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { - - return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) - -} - -func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { - - return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { - return fn(client) - }) - -} diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go deleted file mode 100644 index 465a2a8f2..000000000 --- a/weed/messaging/broker/consistent_distribution.go +++ /dev/null @@ -1,38 +0,0 @@ -package broker - -import ( - "github.com/buraksezer/consistent" - "github.com/cespare/xxhash" -) - -type Member string - -func (m Member) String() string { - return string(m) -} - -type hasher struct{} - -func (h hasher) Sum64(data []byte) uint64 { - return xxhash.Sum64(data) -} - -func PickMember(members []string, key []byte) string { - cfg := consistent.Config{ - PartitionCount: 9791, - ReplicationFactor: 2, - Load: 1.25, - Hasher: hasher{}, - } - - cmembers := []consistent.Member{} - for _, m := range members { - cmembers = append(cmembers, Member(m)) - } - - c := consistent.New(cmembers, cfg) - - m := c.LocateKey(key) - - return m.String() -} diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go deleted file mode 100644 index f58fe4e0e..000000000 --- a/weed/messaging/broker/consistent_distribution_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package broker - -import ( - "fmt" - "testing" -) - -func TestPickMember(t *testing.T) { - - servers := []string{ - "s1:port", - "s2:port", - "s3:port", - "s5:port", - "s4:port", - } - - total := 1000 - - distribution := make(map[string]int) - for i := 0; i < total; i++ { - tp := fmt.Sprintf("tp:%2d", i) - m := PickMember(servers, []byte(tp)) - // println(tp, "=>", m) - distribution[m]++ - } - - for member, count := range distribution { - fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers))) - } - -} diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go deleted file mode 100644 index c303c29b3..000000000 --- a/weed/messaging/broker/topic_manager.go +++ /dev/null @@ -1,124 +0,0 @@ -package broker - -import ( - "fmt" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" -) - -type TopicPartition struct { - Namespace string - Topic string - Partition int32 -} - -const ( - TopicPartitionFmt = "%s/%s_%02d" -) - -func (tp *TopicPartition) String() string { - return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) -} - -type TopicControl struct { - sync.Mutex - cond *sync.Cond - subscriberCount int - publisherCount int - logBuffer *log_buffer.LogBuffer -} - -type TopicManager struct { - sync.Mutex - topicControls map[TopicPartition]*TopicControl - broker *MessageBroker -} - -func NewTopicManager(messageBroker *MessageBroker) *TopicManager { - return &TopicManager{ - topicControls: make(map[TopicPartition]*TopicControl), - broker: messageBroker, - } -} - -func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { - - flushFn := func(startTime, stopTime time.Time, buf []byte) { - - if topicConfig.IsTransient { - // return - } - - // fmt.Printf("flushing with topic config %+v\n", topicConfig) - - startTime, stopTime = startTime.UTC(), stopTime.UTC() - targetFile := fmt.Sprintf( - "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", - filer.TopicsDir, tp.Namespace, tp.Topic, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - tp.Partition, - ) - - if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil { - glog.V(0).Infof("log write failed %s: %v", targetFile, err) - } - } - logBuffer := log_buffer.NewLogBuffer("broker", time.Minute, flushFn, func() { - tl.cond.Broadcast() - }) - - return logBuffer -} - -func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { - tm.Lock() - defer tm.Unlock() - - tc, found := tm.topicControls[partition] - if !found { - tc = &TopicControl{} - tc.cond = sync.NewCond(&tc.Mutex) - tm.topicControls[partition] = tc - tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) - } - if isPublisher { - tc.publisherCount++ - } else { - tc.subscriberCount++ - } - return tc -} - -func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { - tm.Lock() - defer tm.Unlock() - - lock, found := tm.topicControls[partition] - if !found { - return - } - if isPublisher { - lock.publisherCount-- - } else { - lock.subscriberCount-- - } - if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { - delete(tm.topicControls, partition) - lock.logBuffer.Shutdown() - } -} - -func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { - tm.Lock() - defer tm.Unlock() - - for k := range tm.topicControls { - tps = append(tps, k) - } - return -} |
