diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2020-05-17 17:39:16 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-17 17:39:16 -0700 |
| commit | e0e31e67a809d00c99edaa299531c7ce4d4750dc (patch) | |
| tree | 0f890277ef14c748faed4fecb7f8b8d4edeb9849 /weed/messaging | |
| parent | b4e02ec525a6ec87b26686202307896faf3296a7 (diff) | |
| parent | 081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff) | |
| download | seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip | |
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
Diffstat (limited to 'weed/messaging')
18 files changed, 870 insertions, 326 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index e87e197b0..80f107e00 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -3,6 +3,7 @@ package broker import ( "context" "fmt" + "io" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" @@ -94,6 +95,9 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient for _, filer := range broker.option.Filers { if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + if err == io.EOF { + return + } glog.V(0).Infof("fail to connect to %s: %v", filer, err) } else { break diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go index 447620a6b..6918a28a6 100644 --- a/weed/messaging/broker/broker_grpc_server.go +++ b/weed/messaging/broker/broker_grpc_server.go @@ -2,7 +2,10 @@ package broker import ( "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -10,6 +13,25 @@ func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messagin panic("implement me") } +func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) { + resp := &messaging_pb.DeleteTopicResponse{} + dir, entry := genTopicDirEntry(request.Namespace, request.Topic) + if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { + return nil, err + } else if exists { + err = filer_pb.Remove(broker, dir, entry, true, true, true) + } + return resp, nil +} + func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { panic("implement me") } + +func genTopicDir(namespace, topic string) string { + return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace, topic) +} + +func genTopicDirEntry(namespace, topic string) (dir, entry string) { + return fmt.Sprintf("%s/%s", filer2.TopicsDir, namespace), topic +} diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go new file mode 100644 index 000000000..3c14f3220 --- /dev/null +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -0,0 +1,116 @@ +package broker + +import ( + "context" + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +/* +Topic discovery: + +When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker. + +The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it. +Otherwise, just host the topic. + +So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy. +If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help. + +*/ + +func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) { + + t := &messaging_pb.FindBrokerResponse{} + var peers []string + + targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) + + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ + Resource: targetTopicPartition, + }) + if err != nil { + return err + } + if resp.Found && len(resp.Resources) > 0 { + t.Broker = resp.Resources[0].GrpcAddresses + return nil + } + for _, b := range resp.Resources { + peers = append(peers, b.GrpcAddresses) + } + return nil + }) + if err != nil { + return nil, err + } + } + + t.Broker = PickMember(peers, []byte(targetTopicPartition)) + + return t, nil + +} + +func (broker *MessageBroker) checkFilers() { + + // contact a filer about masters + var masters []string + found := false + for !found { + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err == nil { + found = true + break + } + glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) + time.Sleep(time.Second) + } + } + glog.V(0).Infof("received master list: %s", masters) + + // contact each masters for filers + var filers []string + found = false + for !found { + for _, master := range masters { + err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ + ClientType: "filer", + }) + if err != nil { + return err + } + + filers = append(filers, resp.GrpcAddresses...) + + return nil + }) + if err == nil { + found = true + break + } + glog.V(0).Infof("failed to list filers: %v", err) + time.Sleep(time.Second) + } + } + glog.V(0).Infof("received filer list: %s", filers) + + broker.option.Filers = filers + +} diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index b3a909a6c..dc11061af 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -1,11 +1,15 @@ package broker import ( + "crypto/md5" + "fmt" "io" "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -44,27 +48,24 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis Topic: in.Init.Topic, Partition: in.Init.Partition, } - tl := broker.topicLocks.RequestLock(tp, topicConfig, true) - defer broker.topicLocks.ReleaseLock(tp, true) - - updatesChan := make(chan int32) - - go func() { - for update := range updatesChan { - if err := stream.Send(&messaging_pb.PublishResponse{ - Config: &messaging_pb.PublishResponse_ConfigMessage{ - PartitionCount: update, - }, - }); err != nil { - glog.V(0).Infof("err sending publish response: %v", err) - return - } - } - }() + tpDir := fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, tp.Namespace, tp.Topic) + md5File := fmt.Sprintf("p%02d.md5", tp.Partition) + // println("chan data stored under", tpDir, "as", md5File) + + if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists { + return fmt.Errorf("channel is already closed") + } + + tl := broker.topicManager.RequestLock(tp, topicConfig, true) + defer broker.topicManager.ReleaseLock(tp, true) + + md5hash := md5.New() // process each message for { + // println("recv") in, err := stream.Recv() + // glog.V(0).Infof("recieved %v err: %v", in, err) if err == io.EOF { return nil } @@ -86,5 +87,26 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis tl.logBuffer.AddToBuffer(in.Data.Key, data) + if in.Data.IsClose { + // println("server received closing") + break + } + + md5hash.Write(in.Data.Value) + + } + + if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil { + glog.V(0).Infof("err writing %s: %v", md5File, err) } + + // fmt.Printf("received md5 %X\n", md5hash.Sum(nil)) + + // send the close ack + // println("server send ack closing") + if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil { + glog.V(0).Infof("err sending close response: %v", err) + } + return nil + } diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 379063eed..9538d3063 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -25,32 +25,39 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } + var processedTsNs int64 var messageCount int64 subscriberId := in.Init.SubscriberId - fmt.Printf("+ subscriber %s\n", subscriberId) - defer func() { - fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount) - }() // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ // IsTransient: true, } - if err = stream.Send(&messaging_pb.BrokerMessage{ - Redirect: nil, - }); err != nil { - return err - } - // get lock tp := TopicPartition{ Namespace: in.Init.Namespace, Topic: in.Init.Topic, Partition: in.Init.Partition, } - lock := broker.topicLocks.RequestLock(tp, topicConfig, false) - defer broker.topicLocks.ReleaseLock(tp, false) + fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String()) + defer func() { + fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs)) + }() + + lock := broker.topicManager.RequestLock(tp, topicConfig, false) + defer broker.topicManager.ReleaseLock(tp, false) + + isConnected := true + go func() { + for isConnected { + if _, err := stream.Recv(); err != nil { + // println("disconnecting connection to", subscriberId, tp.String()) + isConnected = false + lock.cond.Signal() + } + } + }() lastReadTime := time.Now() switch in.Init.StartPosition { @@ -58,8 +65,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs lastReadTime = time.Unix(0, in.Init.TimestampNs) case messaging_pb.SubscriberMessage_InitMessage_LATEST: case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: + lastReadTime = time.Unix(0, 0) } - var processedTsNs int64 // how to process each message // an error returned will end the subscription @@ -84,23 +91,33 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) return err } + if m.IsClose { + // println("processed EOF") + return io.EOF + } processedTsNs = logEntry.TsNs + messageCount++ return nil } if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { - return err + if err != io.EOF { + // println("stopping from persisted logs", err.Error()) + return err + } } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) } - messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { + // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime) + + err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { lock.Mutex.Lock() lock.cond.Wait() lock.Mutex.Unlock() - return true + return isConnected }, eachLogEntryFn) return err @@ -114,7 +131,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() - topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic) + topicDir := genTopicDir(tp.Namespace, tp.Topic) partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { @@ -125,7 +142,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim return nil } } - if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){ + if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) { return nil } // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) @@ -134,7 +151,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { - return nil + return err } return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) } diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 29c227274..0c04d2841 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -16,6 +16,7 @@ type MessageBrokerOption struct { Filers []string DefaultReplication string MaxMB int + Ip string Port int Cipher bool } @@ -23,7 +24,7 @@ type MessageBrokerOption struct { type MessageBroker struct { option *MessageBrokerOption grpcDialOption grpc.DialOption - topicLocks *TopicLocks + topicManager *TopicManager } func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { @@ -33,77 +34,66 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio grpcDialOption: grpcDialOption, } - messageBroker.topicLocks = NewTopicLocks(messageBroker) + messageBroker.topicManager = NewTopicManager(messageBroker) - messageBroker.checkPeers() + messageBroker.checkFilers() - // go messageBroker.loopForEver() + go messageBroker.keepConnectedToOneFiler() return messageBroker, nil } -func (broker *MessageBroker) loopForEver() { +func (broker *MessageBroker) keepConnectedToOneFiler() { for { - broker.checkPeers() - time.Sleep(3 * time.Second) - } - -} - -func (broker *MessageBroker) checkPeers() { - - // contact a filer about masters - var masters []string - found := false - for !found { for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.KeepConnected(context.Background()) if err != nil { + glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) return err } - masters = append(masters, resp.Masters...) - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received master list: %s", masters) - - // contact each masters for filers - var filers []string - found = false - for !found { - for _, master := range masters { - err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { - resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ - ClientType: "filer", - }) - if err != nil { + + initRequest := &filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + } + for _, tp := range broker.topicManager.ListTopicPartitions() { + initRequest.Resources = append(initRequest.Resources, tp.String()) + } + if err := stream.Send(&filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + }); err != nil { + glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err) return err } - filers = append(filers, resp.GrpcAddresses...) - + // TODO send events of adding/removing topics + + glog.V(0).Infof("conntected with filer: %v", filer) + for { + if err := stream.Send(&filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + }); err != nil { + glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("send heartbeat") + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("received reply") + time.Sleep(11 * time.Second) + // println("woke up") + } return nil }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to list filers: %v", err) - time.Sleep(time.Second) + time.Sleep(3 * time.Second) } } - glog.V(0).Infof("received filer list: %s", filers) - - broker.option.Filers = filers } diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go new file mode 100644 index 000000000..465a2a8f2 --- /dev/null +++ b/weed/messaging/broker/consistent_distribution.go @@ -0,0 +1,38 @@ +package broker + +import ( + "github.com/buraksezer/consistent" + "github.com/cespare/xxhash" +) + +type Member string + +func (m Member) String() string { + return string(m) +} + +type hasher struct{} + +func (h hasher) Sum64(data []byte) uint64 { + return xxhash.Sum64(data) +} + +func PickMember(members []string, key []byte) string { + cfg := consistent.Config{ + PartitionCount: 9791, + ReplicationFactor: 2, + Load: 1.25, + Hasher: hasher{}, + } + + cmembers := []consistent.Member{} + for _, m := range members { + cmembers = append(cmembers, Member(m)) + } + + c := consistent.New(cmembers, cfg) + + m := c.LocateKey(key) + + return m.String() +} diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go new file mode 100644 index 000000000..f58fe4e0e --- /dev/null +++ b/weed/messaging/broker/consistent_distribution_test.go @@ -0,0 +1,32 @@ +package broker + +import ( + "fmt" + "testing" +) + +func TestPickMember(t *testing.T) { + + servers := []string{ + "s1:port", + "s2:port", + "s3:port", + "s5:port", + "s4:port", + } + + total := 1000 + + distribution := make(map[string]int) + for i := 0; i < total; i++ { + tp := fmt.Sprintf("tp:%2d", i) + m := PickMember(servers, []byte(tp)) + // println(tp, "=>", m) + distribution[m]++ + } + + for member, count := range distribution { + fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers))) + } + +} diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go deleted file mode 100644 index f8a5aa171..000000000 --- a/weed/messaging/broker/topic_lock.go +++ /dev/null @@ -1,103 +0,0 @@ -package broker - -import ( - "fmt" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" -) - -type TopicPartition struct { - Namespace string - Topic string - Partition int32 -} -type TopicLock struct { - sync.Mutex - cond *sync.Cond - subscriberCount int - publisherCount int - logBuffer *log_buffer.LogBuffer -} - -type TopicLocks struct { - sync.Mutex - locks map[TopicPartition]*TopicLock - broker *MessageBroker -} - -func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks { - return &TopicLocks{ - locks: make(map[TopicPartition]*TopicLock), - broker: messageBroker, - } -} - -func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { - - flushFn := func(startTime, stopTime time.Time, buf []byte) { - - if topicConfig.IsTransient { - // return - } - - // fmt.Printf("flushing with topic config %+v\n", topicConfig) - - targetFile := fmt.Sprintf( - "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", - filer2.TopicsDir, tp.Namespace, tp.Topic, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - tp.Partition, - ) - - if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil { - glog.V(0).Infof("log write failed %s: %v", targetFile, err) - } - } - logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { - tl.cond.Broadcast() - }) - - return logBuffer -} - -func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock { - tl.Lock() - defer tl.Unlock() - - lock, found := tl.locks[partition] - if !found { - lock = &TopicLock{} - lock.cond = sync.NewCond(&lock.Mutex) - tl.locks[partition] = lock - lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig) - } - if isPublisher { - lock.publisherCount++ - } else { - lock.subscriberCount++ - } - return lock -} - -func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) { - tl.Lock() - defer tl.Unlock() - - lock, found := tl.locks[partition] - if !found { - return - } - if isPublisher { - lock.publisherCount-- - } else { - lock.subscriberCount-- - } - if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { - delete(tl.locks, partition) - } -} diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go new file mode 100644 index 000000000..b563fffa1 --- /dev/null +++ b/weed/messaging/broker/topic_manager.go @@ -0,0 +1,123 @@ +package broker + +import ( + "fmt" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +type TopicPartition struct { + Namespace string + Topic string + Partition int32 +} + +const ( + TopicPartitionFmt = "%s/%s_%02d" +) + +func (tp *TopicPartition) String() string { + return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) +} + +type TopicControl struct { + sync.Mutex + cond *sync.Cond + subscriberCount int + publisherCount int + logBuffer *log_buffer.LogBuffer +} + +type TopicManager struct { + sync.Mutex + topicControls map[TopicPartition]*TopicControl + broker *MessageBroker +} + +func NewTopicManager(messageBroker *MessageBroker) *TopicManager { + return &TopicManager{ + topicControls: make(map[TopicPartition]*TopicControl), + broker: messageBroker, + } +} + +func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { + + flushFn := func(startTime, stopTime time.Time, buf []byte) { + + if topicConfig.IsTransient { + // return + } + + // fmt.Printf("flushing with topic config %+v\n", topicConfig) + + targetFile := fmt.Sprintf( + "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", + filer2.TopicsDir, tp.Namespace, tp.Topic, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + tp.Partition, + ) + + if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil { + glog.V(0).Infof("log write failed %s: %v", targetFile, err) + } + } + logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { + tl.cond.Broadcast() + }) + + return logBuffer +} + +func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { + tm.Lock() + defer tm.Unlock() + + tc, found := tm.topicControls[partition] + if !found { + tc = &TopicControl{} + tc.cond = sync.NewCond(&tc.Mutex) + tm.topicControls[partition] = tc + tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) + } + if isPublisher { + tc.publisherCount++ + } else { + tc.subscriberCount++ + } + return tc +} + +func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { + tm.Lock() + defer tm.Unlock() + + lock, found := tm.topicControls[partition] + if !found { + return + } + if isPublisher { + lock.publisherCount-- + } else { + lock.subscriberCount-- + } + if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { + delete(tm.topicControls, partition) + lock.logBuffer.Shutdown() + } +} + +func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { + tm.Lock() + defer tm.Unlock() + + for k := range tm.topicControls { + tps = append(tps, k) + } + return +} diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go deleted file mode 100644 index 4a674a9fc..000000000 --- a/weed/messaging/client/client.go +++ /dev/null @@ -1,30 +0,0 @@ -package client - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type MessagingClient struct { - bootstrapBrokers []string - grpcConnection *grpc.ClientConn -} - -func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) { - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client") - - grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption) - if err != nil { - return nil, err - } - - return &MessagingClient{ - bootstrapBrokers: bootstrapBrokers, - grpcConnection: grpcConnection, - }, nil -} diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go deleted file mode 100644 index 53e7ffc7d..000000000 --- a/weed/messaging/client/subscriber.go +++ /dev/null @@ -1,91 +0,0 @@ -package client - -import ( - "context" - "io" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -type Subscriber struct { - subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient - subscriberId string -} - -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { - // read topic configuration - topicConfiguration := &messaging_pb.TopicConfiguration{ - PartitionCount: 4, - } - subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) - - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) - if err != nil { - return nil, err - } - subscriberClients[i] = client - } - - return &Subscriber{ - subscriberClients: subscriberClients, - subscriberId: subscriberId, - }, nil -} - -func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { - - stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) - if err != nil { - return nil, err - } - - // send init message - err = stream.Send(&messaging_pb.SubscriberMessage{ - Init: &messaging_pb.SubscriberMessage_InitMessage{ - Namespace: namespace, - Topic: topic, - Partition: partition, - StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, - TimestampNs: startTime.UnixNano(), - SubscriberId: subscriberId, - }, - }) - if err != nil { - return nil, err - } - - // process init response - initResponse, err := stream.Recv() - if err != nil { - return nil, err - } - if initResponse.Redirect != nil { - // TODO follow redirection - } - - return stream, nil - -} - -func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { - for { - resp, listenErr := s.subscriberClients[partition].Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - println(listenErr.Error()) - return listenErr - } - processFn(resp.Data) - } -} - -// Subscribe starts goroutines to process the messages -func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { - for i := 0; i < len(s.subscriberClients); i++ { - go s.doSubscribe(i, processFn) - } -} diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go new file mode 100644 index 000000000..4d7ef2b8e --- /dev/null +++ b/weed/messaging/msgclient/client.go @@ -0,0 +1,55 @@ +package msgclient + +import ( + "context" + "fmt" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MessagingClient struct { + bootstrapBrokers []string + grpcConnections map[broker.TopicPartition]*grpc.ClientConn + grpcDialOption grpc.DialOption +} + +func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { + return &MessagingClient{ + bootstrapBrokers: bootstrapBrokers, + grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), + } +} + +func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { + + for _, broker := range mc.bootstrapBrokers { + grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) + if err != nil { + log.Printf("dial broker %s: %v", broker, err) + continue + } + defer grpcConnection.Close() + + resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), + &messaging_pb.FindBrokerRequest{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Parition: tp.Partition, + }) + if err != nil { + return nil, err + } + + targetBroker := resp.Broker + return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) + } + return nil, fmt.Errorf("no broker found for %+v", tp) +} diff --git a/weed/messaging/msgclient/config.go b/weed/messaging/msgclient/config.go new file mode 100644 index 000000000..2b9eba1a8 --- /dev/null +++ b/weed/messaging/msgclient/config.go @@ -0,0 +1,63 @@ +package msgclient + +import ( + "context" + "log" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error { + + return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { + _, err := client.ConfigureTopic(context.Background(), + &messaging_pb.ConfigureTopicRequest{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Configuration: &messaging_pb.TopicConfiguration{ + PartitionCount: 0, + Collection: "", + Replication: "", + IsTransient: false, + Partitoning: 0, + }, + }) + return err + }) + +} + +func (mc *MessagingClient) DeleteTopic(namespace, topic string) error { + + return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error { + _, err := client.DeleteTopic(context.Background(), + &messaging_pb.DeleteTopicRequest{ + Namespace: namespace, + Topic: topic, + }) + return err + }) +} + +func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error { + + var lastErr error + for _, broker := range mc.bootstrapBrokers { + grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) + if err != nil { + log.Printf("dial broker %s: %v", broker, err) + continue + } + defer grpcConnection.Close() + + err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection)) + if err == nil { + return nil + } + lastErr = err + } + + return lastErr +} diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go new file mode 100644 index 000000000..9bc88f7c0 --- /dev/null +++ b/weed/messaging/msgclient/pub_chan.go @@ -0,0 +1,76 @@ +package msgclient + +import ( + "crypto/md5" + "hash" + "io" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type PubChannel struct { + client messaging_pb.SeaweedMessaging_PublishClient + grpcConnection *grpc.ClientConn + md5hash hash.Hash +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + grpcConnection: grpcConnection, + md5hash: md5.New(), + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + Value: m, + }, + }) + if err == nil { + pc.md5hash.Write(m) + } + return err +} +func (pc *PubChannel) Close() error { + + // println("send closing") + if err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + IsClose: true, + }, + }); err != nil { + log.Printf("err send close: %v", err) + } + // println("receive closing") + if _, err := pc.client.Recv(); err != nil && err != io.EOF { + log.Printf("err receive close: %v", err) + } + // println("close connection") + if err := pc.grpcConnection.Close(); err != nil { + log.Printf("err connection close: %v", err) + } + return nil +} + +func (pc *PubChannel) Md5() []byte { + return pc.md5hash.Sum(nil) +} diff --git a/weed/messaging/client/publisher.go b/weed/messaging/msgclient/publisher.go index 68e5729c1..b0fc5afbf 100644 --- a/weed/messaging/client/publisher.go +++ b/weed/messaging/msgclient/publisher.go @@ -1,10 +1,12 @@ -package client +package msgclient import ( "context" "github.com/OneOfOne/xxhash" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -22,7 +24,16 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* } publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupPublisherClient(namespace, topic, int32(i)) + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + client, err := setupPublisherClient(grpcClientConn, tp) if err != nil { return nil, err } @@ -34,9 +45,9 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* }, nil } -func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) { +func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { - stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background()) + stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) if err != nil { return nil, err } @@ -44,9 +55,9 @@ func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partiti // send init message err = stream.Send(&messaging_pb.PublishRequest{ Init: &messaging_pb.PublishRequest_InitMessage{ - Namespace: namespace, - Topic: topic, - Partition: partition, + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, }, }) if err != nil { @@ -105,9 +116,3 @@ func (p *Publisher) Publish(m *messaging_pb.Message) error { Data: m, }) } - -func (p *Publisher) Shutdown() { - for _, client := range p.publishClients { - client.CloseSend() - } -} diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go new file mode 100644 index 000000000..213ff4666 --- /dev/null +++ b/weed/messaging/msgclient/sub_chan.go @@ -0,0 +1,85 @@ +package msgclient + +import ( + "context" + "crypto/md5" + "hash" + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type SubChannel struct { + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient + md5hash hash.Hash + cancel context.CancelFunc +} + +func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0)) + if err != nil { + return nil, err + } + + t := &SubChannel{ + ch: make(chan []byte), + stream: sc, + md5hash: md5.New(), + cancel: cancel, + } + + go func() { + for { + resp, subErr := t.stream.Recv() + if subErr == io.EOF { + return + } + if subErr != nil { + log.Printf("fail to receive from netchan %s: %v", chanName, subErr) + return + } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } + if resp.Data.IsClose { + t.stream.Send(&messaging_pb.SubscriberMessage{ + IsClose: true, + }) + close(t.ch) + cancel() + return + } + t.ch <- resp.Data.Value + t.md5hash.Write(resp.Data.Value) + } + }() + + return t, nil +} + +func (sc *SubChannel) Channel() chan []byte { + return sc.ch +} + +func (sc *SubChannel) Md5() []byte { + return sc.md5hash.Sum(nil) +} + +func (sc *SubChannel) Cancel() { + sc.cancel() +} diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go new file mode 100644 index 000000000..caa795626 --- /dev/null +++ b/weed/messaging/msgclient/subscriber.go @@ -0,0 +1,120 @@ +package msgclient + +import ( + "context" + "io" + "time" + "sync" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "google.golang.org/grpc" +) + +type Subscriber struct { + subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberCancels []context.CancelFunc + subscriberId string +} + +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) + + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + if partitionId>=0 && i != partitionId { + continue + } + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime) + if err != nil { + return nil, err + } + subscriberClients[i] = client + subscriberCancels[i] = cancel + } + + return &Subscriber{ + subscriberClients: subscriberClients, + subscriberCancels: subscriberCancels, + subscriberId: subscriberId, + }, nil +} + +func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { + stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) + if err != nil { + return + } + + // send init message + err = stream.Send(&messaging_pb.SubscriberMessage{ + Init: &messaging_pb.SubscriberMessage_InitMessage{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, + StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, + TimestampNs: startTime.UnixNano(), + SubscriberId: subscriberId, + }, + }) + if err != nil { + return + } + + return stream, nil +} + +func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { + for { + resp, listenErr := subscriberClient.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + println(listenErr.Error()) + return listenErr + } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } + processFn(resp.Data) + } +} + +// Subscribe starts goroutines to process the messages +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + var wg sync.WaitGroup + for i := 0; i < len(s.subscriberClients); i++ { + if s.subscriberClients[i] != nil { + wg.Add(1) + go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) { + defer wg.Done() + doSubscribe(subscriberClient, processFn) + }(s.subscriberClients[i]) + } + } + wg.Wait() +} + +func (s *Subscriber) Shutdown() { + for i := 0; i < len(s.subscriberClients); i++ { + if s.subscriberCancels[i] != nil { + s.subscriberCancels[i]() + } + } +} |
