diff options
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 130 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server.go | 37 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_discovery.go | 122 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 112 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 178 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 116 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution.go | 38 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution_test.go | 32 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_manager.go | 124 | ||||
| -rw-r--r-- | weed/messaging/msgclient/chan_config.go | 5 | ||||
| -rw-r--r-- | weed/messaging/msgclient/chan_pub.go | 76 | ||||
| -rw-r--r-- | weed/messaging/msgclient/chan_sub.go | 85 | ||||
| -rw-r--r-- | weed/messaging/msgclient/client.go | 55 | ||||
| -rw-r--r-- | weed/messaging/msgclient/config.go | 63 | ||||
| -rw-r--r-- | weed/messaging/msgclient/publisher.go | 118 | ||||
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 120 |
16 files changed, 0 insertions, 1411 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go deleted file mode 100644 index 9a31a8ac0..000000000 --- a/weed/messaging/broker/broker_append.go +++ /dev/null @@ -1,130 +0,0 @@ -package broker - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "io" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error { - - assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) - if err2 != nil { - return err2 - } - - dir, name := util.FullPath(targetFile).DirAndName() - - // append the chunk - if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.AppendToEntryRequest{ - Directory: dir, - EntryName: name, - Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)}, - } - - _, err := client.AppendToEntry(context.Background(), request) - if err != nil { - glog.V(0).Infof("append to file %v: %v", request, err) - return err - } - - return nil - }); err != nil { - return fmt.Errorf("append to file %v: %v", targetFile, err) - } - - return nil -} - -func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { - - var assignResult = &operation.AssignResult{} - - // assign a volume location - if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - - assignErr := util.Retry("assignVolume", func() error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: topicConfig.Replication, - Collection: topicConfig.Collection, - } - - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - assignResult.Auth = security.EncodedJwt(resp.Auth) - assignResult.Fid = resp.FileId - assignResult.Url = resp.Location.Url - assignResult.PublicUrl = resp.Location.PublicUrl - assignResult.GrpcPort = int(resp.Location.GrpcPort) - assignResult.Count = uint64(resp.Count) - - return nil - }) - if assignErr != nil { - return assignErr - } - - return nil - }); err != nil { - return nil, nil, err - } - - // upload data - targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) - uploadOption := &operation.UploadOption{ - UploadUrl: targetUrl, - Filename: "", - Cipher: broker.option.Cipher, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - Jwt: assignResult.Auth, - } - uploadResult, err := operation.UploadData(data, uploadOption) - if err != nil { - return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) - } - // println("uploaded to", targetUrl) - return assignResult, uploadResult, nil -} - -var _ = filer_pb.FilerClient(&MessageBroker{}) - -func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { - - for _, filer := range broker.option.Filers { - if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { - if err == io.EOF { - return - } - glog.V(0).Infof("fail to connect to %s: %v", filer, err) - } else { - break - } - } - - return - -} - -func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string { - return location.Url -} diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go deleted file mode 100644 index ba141fdd0..000000000 --- a/weed/messaging/broker/broker_grpc_server.go +++ /dev/null @@ -1,37 +0,0 @@ -package broker - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) { - panic("implement me") -} - -func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) { - resp := &messaging_pb.DeleteTopicResponse{} - dir, entry := genTopicDirEntry(request.Namespace, request.Topic) - if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { - return nil, err - } else if exists { - err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil) - } - return resp, nil -} - -func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { - panic("implement me") -} - -func genTopicDir(namespace, topic string) string { - return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic) -} - -func genTopicDirEntry(namespace, topic string) (dir, entry string) { - return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic -} diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go deleted file mode 100644 index 5cd8edd33..000000000 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ /dev/null @@ -1,122 +0,0 @@ -package broker - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/cluster" - "github.com/chrislusf/seaweedfs/weed/pb" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -/* -Topic discovery: - -When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker. - -The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it. -Otherwise, just host the topic. - -So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy. -If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help. - -*/ - -func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) { - - t := &messaging_pb.FindBrokerResponse{} - var peers []string - - targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) - - for _, filer := range broker.option.Filers { - err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ - Resource: targetTopicPartition, - }) - if err != nil { - return err - } - if resp.Found && len(resp.Resources) > 0 { - t.Broker = resp.Resources[0].GrpcAddresses - return nil - } - for _, b := range resp.Resources { - peers = append(peers, b.GrpcAddresses) - } - return nil - }) - if err != nil { - return nil, err - } - } - - t.Broker = PickMember(peers, []byte(targetTopicPartition)) - - return t, nil - -} - -func (broker *MessageBroker) checkFilers() { - - // contact a filer about masters - var masters []pb.ServerAddress - found := false - for !found { - for _, filer := range broker.option.Filers { - err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err - } - for _, m := range resp.Masters { - masters = append(masters, pb.ServerAddress(m)) - } - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received master list: %s", masters) - - // contact each masters for filers - var filers []pb.ServerAddress - found = false - for !found { - for _, master := range masters { - err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error { - resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: cluster.FilerType, - }) - if err != nil { - return err - } - - for _, clusterNode := range resp.ClusterNodes { - filers = append(filers, pb.ServerAddress(clusterNode.Address)) - } - - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to list filers: %v", err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received filer list: %s", filers) - - broker.option.Filers = filers - -} diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go deleted file mode 100644 index 6e6b723d1..000000000 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ /dev/null @@ -1,112 +0,0 @@ -package broker - -import ( - "crypto/md5" - "fmt" - "io" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error { - - // process initial request - in, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - // TODO look it up - topicConfig := &messaging_pb.TopicConfiguration{ - // IsTransient: true, - } - - // send init response - initResponse := &messaging_pb.PublishResponse{ - Config: nil, - Redirect: nil, - } - err = stream.Send(initResponse) - if err != nil { - return err - } - if initResponse.Redirect != nil { - return nil - } - - // get lock - tp := TopicPartition{ - Namespace: in.Init.Namespace, - Topic: in.Init.Topic, - Partition: in.Init.Partition, - } - - tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic) - md5File := fmt.Sprintf("p%02d.md5", tp.Partition) - // println("chan data stored under", tpDir, "as", md5File) - - if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists { - return fmt.Errorf("channel is already closed") - } - - tl := broker.topicManager.RequestLock(tp, topicConfig, true) - defer broker.topicManager.ReleaseLock(tp, true) - - md5hash := md5.New() - // process each message - for { - // println("recv") - in, err := stream.Recv() - // glog.V(0).Infof("recieved %v err: %v", in, err) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - if in.Data == nil { - continue - } - - // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value)) - - data, err := proto.Marshal(in.Data) - if err != nil { - glog.Errorf("marshall error: %v\n", err) - continue - } - - tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs) - - if in.Data.IsClose { - // println("server received closing") - break - } - - md5hash.Write(in.Data.Value) - - } - - if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil { - glog.V(0).Infof("err writing %s: %v", md5File, err) - } - - // fmt.Printf("received md5 %X\n", md5hash.Sum(nil)) - - // send the close ack - // println("server send ack closing") - if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil { - glog.V(0).Infof("err sending close response: %v", err) - } - return nil - -} diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go deleted file mode 100644 index 20d529239..000000000 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ /dev/null @@ -1,178 +0,0 @@ -package broker - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" - "io" - "strings" - "time" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error { - - // process initial request - in, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - var processedTsNs int64 - var messageCount int64 - subscriberId := in.Init.SubscriberId - - // TODO look it up - topicConfig := &messaging_pb.TopicConfiguration{ - // IsTransient: true, - } - - // get lock - tp := TopicPartition{ - Namespace: in.Init.Namespace, - Topic: in.Init.Topic, - Partition: in.Init.Partition, - } - fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String()) - defer func() { - fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs)) - }() - - lock := broker.topicManager.RequestLock(tp, topicConfig, false) - defer broker.topicManager.ReleaseLock(tp, false) - - isConnected := true - go func() { - for isConnected { - if _, err := stream.Recv(); err != nil { - // println("disconnecting connection to", subscriberId, tp.String()) - isConnected = false - lock.cond.Signal() - } - } - }() - - lastReadTime := time.Now() - switch in.Init.StartPosition { - case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP: - lastReadTime = time.Unix(0, in.Init.TimestampNs) - case messaging_pb.SubscriberMessage_InitMessage_LATEST: - case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: - lastReadTime = time.Unix(0, 0) - } - - // how to process each message - // an error returned will end the subscription - eachMessageFn := func(m *messaging_pb.Message) error { - err := stream.Send(&messaging_pb.BrokerMessage{ - Data: m, - }) - if err != nil { - glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) - } - return err - } - - eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { - m := &messaging_pb.Message{} - if err = proto.Unmarshal(logEntry.Data, m); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) - return err - } - // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs) - if err = eachMessageFn(m); err != nil { - glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) - return err - } - if m.IsClose { - // println("processed EOF") - return io.EOF - } - processedTsNs = logEntry.TsNs - messageCount++ - return nil - } - - // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime) - - for { - - if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { - if err != io.EOF { - // println("stopping from persisted logs", err.Error()) - return err - } - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - - lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool { - lock.Mutex.Lock() - lock.cond.Wait() - lock.Mutex.Unlock() - return isConnected - }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { - continue - } - glog.Errorf("processed to %v: %v", lastReadTime, err) - if err != log_buffer.ResumeError { - break - } - } - } - - return err - -} - -func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { - startTime = startTime.UTC() - startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) - - sizeBuf := make([]byte, 4) - startTsNs := startTime.UnixNano() - - topicDir := genTopicDir(tp.Namespace, tp.Topic) - partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) - - return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { - dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) - return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { - if dayEntry.Name == startDate { - hourMinute := util.FileNameBase(hourMinuteEntry.Name) - if strings.Compare(hourMinute, startHourMinute) < 0 { - return nil - } - } - if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) { - return nil - } - // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) - chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) - defer chunkedFileReader.Close() - if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil { - chunkedFileReader.Close() - if err == io.EOF { - return err - } - return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) - } - return nil - }, "", false, 24*60) - }, startDate, true, 366) - -} diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go deleted file mode 100644 index acf2d6d34..000000000 --- a/weed/messaging/broker/broker_server.go +++ /dev/null @@ -1,116 +0,0 @@ -package broker - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "time" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" -) - -type MessageBrokerOption struct { - Filers []pb.ServerAddress - DefaultReplication string - MaxMB int - Ip string - Port int - Cipher bool -} - -type MessageBroker struct { - messaging_pb.UnimplementedSeaweedMessagingServer - option *MessageBrokerOption - grpcDialOption grpc.DialOption - topicManager *TopicManager -} - -func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { - - messageBroker = &MessageBroker{ - option: option, - grpcDialOption: grpcDialOption, - } - - messageBroker.topicManager = NewTopicManager(messageBroker) - - messageBroker.checkFilers() - - go messageBroker.keepConnectedToOneFiler() - - return messageBroker, nil -} - -func (broker *MessageBroker) keepConnectedToOneFiler() { - - for { - for _, filer := range broker.option.Filers { - broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.KeepConnected(ctx) - if err != nil { - glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - - initRequest := &filer_pb.KeepConnectedRequest{ - Name: broker.option.Ip, - GrpcPort: uint32(broker.option.Port), - } - for _, tp := range broker.topicManager.ListTopicPartitions() { - initRequest.Resources = append(initRequest.Resources, tp.String()) - } - if err := stream.Send(&filer_pb.KeepConnectedRequest{ - Name: broker.option.Ip, - GrpcPort: uint32(broker.option.Port), - }); err != nil { - glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - - // TODO send events of adding/removing topics - - glog.V(0).Infof("conntected with filer: %v", filer) - for { - if err := stream.Send(&filer_pb.KeepConnectedRequest{ - Name: broker.option.Ip, - GrpcPort: uint32(broker.option.Port), - }); err != nil { - glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - // println("send heartbeat") - if _, err := stream.Recv(); err != nil { - glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err) - return err - } - // println("received reply") - time.Sleep(11 * time.Second) - // println("woke up") - } - return nil - }) - time.Sleep(3 * time.Second) - } - } - -} - -func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { - - return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) - -} - -func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { - - return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { - return fn(client) - }) - -} diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go deleted file mode 100644 index 465a2a8f2..000000000 --- a/weed/messaging/broker/consistent_distribution.go +++ /dev/null @@ -1,38 +0,0 @@ -package broker - -import ( - "github.com/buraksezer/consistent" - "github.com/cespare/xxhash" -) - -type Member string - -func (m Member) String() string { - return string(m) -} - -type hasher struct{} - -func (h hasher) Sum64(data []byte) uint64 { - return xxhash.Sum64(data) -} - -func PickMember(members []string, key []byte) string { - cfg := consistent.Config{ - PartitionCount: 9791, - ReplicationFactor: 2, - Load: 1.25, - Hasher: hasher{}, - } - - cmembers := []consistent.Member{} - for _, m := range members { - cmembers = append(cmembers, Member(m)) - } - - c := consistent.New(cmembers, cfg) - - m := c.LocateKey(key) - - return m.String() -} diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go deleted file mode 100644 index f58fe4e0e..000000000 --- a/weed/messaging/broker/consistent_distribution_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package broker - -import ( - "fmt" - "testing" -) - -func TestPickMember(t *testing.T) { - - servers := []string{ - "s1:port", - "s2:port", - "s3:port", - "s5:port", - "s4:port", - } - - total := 1000 - - distribution := make(map[string]int) - for i := 0; i < total; i++ { - tp := fmt.Sprintf("tp:%2d", i) - m := PickMember(servers, []byte(tp)) - // println(tp, "=>", m) - distribution[m]++ - } - - for member, count := range distribution { - fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers))) - } - -} diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go deleted file mode 100644 index c303c29b3..000000000 --- a/weed/messaging/broker/topic_manager.go +++ /dev/null @@ -1,124 +0,0 @@ -package broker - -import ( - "fmt" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" -) - -type TopicPartition struct { - Namespace string - Topic string - Partition int32 -} - -const ( - TopicPartitionFmt = "%s/%s_%02d" -) - -func (tp *TopicPartition) String() string { - return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) -} - -type TopicControl struct { - sync.Mutex - cond *sync.Cond - subscriberCount int - publisherCount int - logBuffer *log_buffer.LogBuffer -} - -type TopicManager struct { - sync.Mutex - topicControls map[TopicPartition]*TopicControl - broker *MessageBroker -} - -func NewTopicManager(messageBroker *MessageBroker) *TopicManager { - return &TopicManager{ - topicControls: make(map[TopicPartition]*TopicControl), - broker: messageBroker, - } -} - -func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { - - flushFn := func(startTime, stopTime time.Time, buf []byte) { - - if topicConfig.IsTransient { - // return - } - - // fmt.Printf("flushing with topic config %+v\n", topicConfig) - - startTime, stopTime = startTime.UTC(), stopTime.UTC() - targetFile := fmt.Sprintf( - "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", - filer.TopicsDir, tp.Namespace, tp.Topic, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - tp.Partition, - ) - - if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil { - glog.V(0).Infof("log write failed %s: %v", targetFile, err) - } - } - logBuffer := log_buffer.NewLogBuffer("broker", time.Minute, flushFn, func() { - tl.cond.Broadcast() - }) - - return logBuffer -} - -func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { - tm.Lock() - defer tm.Unlock() - - tc, found := tm.topicControls[partition] - if !found { - tc = &TopicControl{} - tc.cond = sync.NewCond(&tc.Mutex) - tm.topicControls[partition] = tc - tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) - } - if isPublisher { - tc.publisherCount++ - } else { - tc.subscriberCount++ - } - return tc -} - -func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { - tm.Lock() - defer tm.Unlock() - - lock, found := tm.topicControls[partition] - if !found { - return - } - if isPublisher { - lock.publisherCount-- - } else { - lock.subscriberCount-- - } - if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { - delete(tm.topicControls, partition) - lock.logBuffer.Shutdown() - } -} - -func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { - tm.Lock() - defer tm.Unlock() - - for k := range tm.topicControls { - tps = append(tps, k) - } - return -} diff --git a/weed/messaging/msgclient/chan_config.go b/weed/messaging/msgclient/chan_config.go deleted file mode 100644 index a75678815..000000000 --- a/weed/messaging/msgclient/chan_config.go +++ /dev/null @@ -1,5 +0,0 @@ -package msgclient - -func (mc *MessagingClient) DeleteChannel(chanName string) error { - return mc.DeleteTopic("chan", chanName) -} diff --git a/weed/messaging/msgclient/chan_pub.go b/weed/messaging/msgclient/chan_pub.go deleted file mode 100644 index 9bc88f7c0..000000000 --- a/weed/messaging/msgclient/chan_pub.go +++ /dev/null @@ -1,76 +0,0 @@ -package msgclient - -import ( - "crypto/md5" - "hash" - "io" - "log" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -type PubChannel struct { - client messaging_pb.SeaweedMessaging_PublishClient - grpcConnection *grpc.ClientConn - md5hash hash.Hash -} - -func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { - tp := broker.TopicPartition{ - Namespace: "chan", - Topic: chanName, - Partition: 0, - } - grpcConnection, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - pc, err := setupPublisherClient(grpcConnection, tp) - if err != nil { - return nil, err - } - return &PubChannel{ - client: pc, - grpcConnection: grpcConnection, - md5hash: md5.New(), - }, nil -} - -func (pc *PubChannel) Publish(m []byte) error { - err := pc.client.Send(&messaging_pb.PublishRequest{ - Data: &messaging_pb.Message{ - Value: m, - }, - }) - if err == nil { - pc.md5hash.Write(m) - } - return err -} -func (pc *PubChannel) Close() error { - - // println("send closing") - if err := pc.client.Send(&messaging_pb.PublishRequest{ - Data: &messaging_pb.Message{ - IsClose: true, - }, - }); err != nil { - log.Printf("err send close: %v", err) - } - // println("receive closing") - if _, err := pc.client.Recv(); err != nil && err != io.EOF { - log.Printf("err receive close: %v", err) - } - // println("close connection") - if err := pc.grpcConnection.Close(); err != nil { - log.Printf("err connection close: %v", err) - } - return nil -} - -func (pc *PubChannel) Md5() []byte { - return pc.md5hash.Sum(nil) -} diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/messaging/msgclient/chan_sub.go deleted file mode 100644 index 213ff4666..000000000 --- a/weed/messaging/msgclient/chan_sub.go +++ /dev/null @@ -1,85 +0,0 @@ -package msgclient - -import ( - "context" - "crypto/md5" - "hash" - "io" - "log" - "time" - - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -type SubChannel struct { - ch chan []byte - stream messaging_pb.SeaweedMessaging_SubscribeClient - md5hash hash.Hash - cancel context.CancelFunc -} - -func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { - tp := broker.TopicPartition{ - Namespace: "chan", - Topic: chanName, - Partition: 0, - } - grpcConnection, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) - sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0)) - if err != nil { - return nil, err - } - - t := &SubChannel{ - ch: make(chan []byte), - stream: sc, - md5hash: md5.New(), - cancel: cancel, - } - - go func() { - for { - resp, subErr := t.stream.Recv() - if subErr == io.EOF { - return - } - if subErr != nil { - log.Printf("fail to receive from netchan %s: %v", chanName, subErr) - return - } - if resp.Data == nil { - // this could be heartbeat from broker - continue - } - if resp.Data.IsClose { - t.stream.Send(&messaging_pb.SubscriberMessage{ - IsClose: true, - }) - close(t.ch) - cancel() - return - } - t.ch <- resp.Data.Value - t.md5hash.Write(resp.Data.Value) - } - }() - - return t, nil -} - -func (sc *SubChannel) Channel() chan []byte { - return sc.ch -} - -func (sc *SubChannel) Md5() []byte { - return sc.md5hash.Sum(nil) -} - -func (sc *SubChannel) Cancel() { - sc.cancel() -} diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go deleted file mode 100644 index 4d7ef2b8e..000000000 --- a/weed/messaging/msgclient/client.go +++ /dev/null @@ -1,55 +0,0 @@ -package msgclient - -import ( - "context" - "fmt" - "log" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type MessagingClient struct { - bootstrapBrokers []string - grpcConnections map[broker.TopicPartition]*grpc.ClientConn - grpcDialOption grpc.DialOption -} - -func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { - return &MessagingClient{ - bootstrapBrokers: bootstrapBrokers, - grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), - } -} - -func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { - - for _, broker := range mc.bootstrapBrokers { - grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) - if err != nil { - log.Printf("dial broker %s: %v", broker, err) - continue - } - defer grpcConnection.Close() - - resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), - &messaging_pb.FindBrokerRequest{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Parition: tp.Partition, - }) - if err != nil { - return nil, err - } - - targetBroker := resp.Broker - return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) - } - return nil, fmt.Errorf("no broker found for %+v", tp) -} diff --git a/weed/messaging/msgclient/config.go b/weed/messaging/msgclient/config.go deleted file mode 100644 index 2b9eba1a8..000000000 --- a/weed/messaging/msgclient/config.go +++ /dev/null @@ -1,63 +0,0 @@ -package msgclient - -import ( - "context" - "log" - - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error { - - return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { - _, err := client.ConfigureTopic(context.Background(), - &messaging_pb.ConfigureTopicRequest{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Configuration: &messaging_pb.TopicConfiguration{ - PartitionCount: 0, - Collection: "", - Replication: "", - IsTransient: false, - Partitoning: 0, - }, - }) - return err - }) - -} - -func (mc *MessagingClient) DeleteTopic(namespace, topic string) error { - - return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { - _, err := client.DeleteTopic(context.Background(), - &messaging_pb.DeleteTopicRequest{ - Namespace: namespace, - Topic: topic, - }) - return err - }) -} - -func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error { - - var lastErr error - for _, broker := range mc.bootstrapBrokers { - grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) - if err != nil { - log.Printf("dial broker %s: %v", broker, err) - continue - } - defer grpcConnection.Close() - - err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection)) - if err == nil { - return nil - } - lastErr = err - } - - return lastErr -} diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go deleted file mode 100644 index 1aa483ff8..000000000 --- a/weed/messaging/msgclient/publisher.go +++ /dev/null @@ -1,118 +0,0 @@ -package msgclient - -import ( - "context" - - "github.com/OneOfOne/xxhash" - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -type Publisher struct { - publishClients []messaging_pb.SeaweedMessaging_PublishClient - topicConfiguration *messaging_pb.TopicConfiguration - messageCount uint64 - publisherId string -} - -func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { - // read topic configuration - topicConfiguration := &messaging_pb.TopicConfiguration{ - PartitionCount: 4, - } - publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - tp := broker.TopicPartition{ - Namespace: namespace, - Topic: topic, - Partition: int32(i), - } - grpcClientConn, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - client, err := setupPublisherClient(grpcClientConn, tp) - if err != nil { - return nil, err - } - publishClients[i] = client - } - return &Publisher{ - publishClients: publishClients, - topicConfiguration: topicConfiguration, - }, nil -} - -func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { - - stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) - if err != nil { - return nil, err - } - - // send init message - err = stream.Send(&messaging_pb.PublishRequest{ - Init: &messaging_pb.PublishRequest_InitMessage{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Partition: tp.Partition, - }, - }) - if err != nil { - return nil, err - } - - // process init response - initResponse, err := stream.Recv() - if err != nil { - return nil, err - } - if initResponse.Redirect != nil { - // TODO follow redirection - } - if initResponse.Config != nil { - } - - // setup looks for control messages - doneChan := make(chan error, 1) - go func() { - for { - in, err := stream.Recv() - if err != nil { - doneChan <- err - return - } - if in.Redirect != nil { - } - if in.Config != nil { - } - } - }() - - return stream, nil - -} - -func (p *Publisher) Publish(m *messaging_pb.Message) error { - hashValue := p.messageCount - p.messageCount++ - if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { - if m.Key != nil { - hashValue = xxhash.Checksum64(m.Key) - } - } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { - hashValue = xxhash.Checksum64(m.Key) - } else { - // round robin - } - - idx := int(hashValue) % len(p.publishClients) - if idx < 0 { - idx += len(p.publishClients) - } - return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ - Data: m, - }) -} diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go deleted file mode 100644 index 6c7dc1ab7..000000000 --- a/weed/messaging/msgclient/subscriber.go +++ /dev/null @@ -1,120 +0,0 @@ -package msgclient - -import ( - "context" - "io" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "google.golang.org/grpc" -) - -type Subscriber struct { - subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient - subscriberCancels []context.CancelFunc - subscriberId string -} - -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { - // read topic configuration - topicConfiguration := &messaging_pb.TopicConfiguration{ - PartitionCount: 4, - } - subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) - subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) - - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - if partitionId >= 0 && i != partitionId { - continue - } - tp := broker.TopicPartition{ - Namespace: namespace, - Topic: topic, - Partition: int32(i), - } - grpcClientConn, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) - client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime) - if err != nil { - return nil, err - } - subscriberClients[i] = client - subscriberCancels[i] = cancel - } - - return &Subscriber{ - subscriberClients: subscriberClients, - subscriberCancels: subscriberCancels, - subscriberId: subscriberId, - }, nil -} - -func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { - stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) - if err != nil { - return - } - - // send init message - err = stream.Send(&messaging_pb.SubscriberMessage{ - Init: &messaging_pb.SubscriberMessage_InitMessage{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Partition: tp.Partition, - StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, - TimestampNs: startTime.UnixNano(), - SubscriberId: subscriberId, - }, - }) - if err != nil { - return - } - - return stream, nil -} - -func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { - for { - resp, listenErr := subscriberClient.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - println(listenErr.Error()) - return listenErr - } - if resp.Data == nil { - // this could be heartbeat from broker - continue - } - processFn(resp.Data) - } -} - -// Subscribe starts goroutines to process the messages -func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { - var wg sync.WaitGroup - for i := 0; i < len(s.subscriberClients); i++ { - if s.subscriberClients[i] != nil { - wg.Add(1) - go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) { - defer wg.Done() - doSubscribe(subscriberClient, processFn) - }(s.subscriberClients[i]) - } - } - wg.Wait() -} - -func (s *Subscriber) Shutdown() { - for i := 0; i < len(s.subscriberClients); i++ { - if s.subscriberCancels[i] != nil { - s.subscriberCancels[i]() - } - } -} |
