diff options
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 4 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server.go | 4 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_discovery.go | 35 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 30 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 12 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 24 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution.go | 38 | ||||
| -rw-r--r-- | weed/messaging/broker/consistent_distribution_test.go | 32 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_lock.go | 17 | ||||
| -rw-r--r-- | weed/messaging/msgclient/pub_sub_chan.go | 38 | ||||
| -rw-r--r-- | weed/messaging/msgclient/publisher.go | 11 | ||||
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 12 |
12 files changed, 216 insertions, 41 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index e87e197b0..80f107e00 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -3,6 +3,7 @@ package broker import ( "context" "fmt" + "io" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" @@ -94,6 +95,9 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient for _, filer := range broker.option.Filers { if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + if err == io.EOF { + return + } glog.V(0).Infof("fail to connect to %s: %v", filer, err) } else { break diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go index 447620a6b..32dab6813 100644 --- a/weed/messaging/broker/broker_grpc_server.go +++ b/weed/messaging/broker/broker_grpc_server.go @@ -10,6 +10,10 @@ func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messagin panic("implement me") } +func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) { + panic("implement me") +} + func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { panic("implement me") } diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go index 4b7f357fa..3c14f3220 100644 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -2,6 +2,7 @@ package broker import ( "context" + "fmt" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -25,12 +26,40 @@ If one of the pub or sub connects very late, and the system topo changed quite a func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) { - panic("implement me") -} + t := &messaging_pb.FindBrokerResponse{} + var peers []string + + targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) + + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ + Resource: targetTopicPartition, + }) + if err != nil { + return err + } + if resp.Found && len(resp.Resources) > 0 { + t.Broker = resp.Resources[0].GrpcAddresses + return nil + } + for _, b := range resp.Resources { + peers = append(peers, b.GrpcAddresses) + } + return nil + }) + if err != nil { + return nil, err + } + } + + t.Broker = PickMember(peers, []byte(targetTopicPartition)) + return t, nil +} -func (broker *MessageBroker) checkPeers() { +func (broker *MessageBroker) checkFilers() { // contact a filer about masters var masters []string diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index b3a909a6c..61e53b433 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -47,24 +47,11 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis tl := broker.topicLocks.RequestLock(tp, topicConfig, true) defer broker.topicLocks.ReleaseLock(tp, true) - updatesChan := make(chan int32) - - go func() { - for update := range updatesChan { - if err := stream.Send(&messaging_pb.PublishResponse{ - Config: &messaging_pb.PublishResponse_ConfigMessage{ - PartitionCount: update, - }, - }); err != nil { - glog.V(0).Infof("err sending publish response: %v", err) - return - } - } - }() - // process each message for { + // println("recv") in, err := stream.Recv() + // glog.V(0).Infof("recieved %v err: %v", in, err) if err == io.EOF { return nil } @@ -86,5 +73,18 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis tl.logBuffer.AddToBuffer(in.Data.Key, data) + if in.Data.IsClose { + // println("server received closing") + break + } + } + + // send the close ack + // println("server send ack closing") + if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil { + glog.V(0).Infof("err sending close response: %v", err) + } + return nil + } diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 472a5007b..761129e80 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -83,11 +83,17 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) return err } + if m.IsClose { + // println("processed EOF") + return io.EOF + } processedTsNs = logEntry.TsNs + messageCount++ return nil } if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { + // println("stopping from persisted logs") return err } @@ -95,7 +101,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs lastReadTime = time.Unix(0, processedTsNs) } - messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { + err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { lock.Mutex.Lock() lock.cond.Wait() lock.Mutex.Unlock() @@ -124,7 +130,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim return nil } } - if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){ + if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) { return nil } // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) @@ -133,7 +139,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { - return nil + return err } return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) } diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 9cad27214..e6ff2cf00 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -36,7 +36,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio messageBroker.topicLocks = NewTopicLocks(messageBroker) - messageBroker.checkPeers() + messageBroker.checkFilers() go messageBroker.keepConnectedToOneFiler() @@ -53,6 +53,24 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) return err } + + initRequest := &filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + } + for _, tp := range broker.topicLocks.ListTopicPartitions() { + initRequest.Resources = append(initRequest.Resources, tp.String()) + } + if err := stream.Send(&filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + }); err != nil { + glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + + // TODO send events of adding/removing topics + glog.V(0).Infof("conntected with filer: %v", filer) for { if err := stream.Send(&filer_pb.KeepConnectedRequest{ @@ -68,12 +86,12 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { return err } // println("received reply") - time.Sleep(11*time.Second) + time.Sleep(11 * time.Second) // println("woke up") } return nil }) - time.Sleep(3*time.Second) + time.Sleep(3 * time.Second) } } diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go new file mode 100644 index 000000000..dd7d34f86 --- /dev/null +++ b/weed/messaging/broker/consistent_distribution.go @@ -0,0 +1,38 @@ +package broker + +import ( + "github.com/cespare/xxhash" + "github.com/buraksezer/consistent" +) + +type Member string + +func (m Member) String() string { + return string(m) +} + +type hasher struct{} + +func (h hasher) Sum64(data []byte) uint64 { + return xxhash.Sum64(data) +} + +func PickMember(members []string, key []byte) string { + cfg := consistent.Config{ + PartitionCount: 9791, + ReplicationFactor: 2, + Load: 1.25, + Hasher: hasher{}, + } + + cmembers := []consistent.Member{} + for _, m := range members { + cmembers = append(cmembers, Member(m)) + } + + c := consistent.New(cmembers, cfg) + + m := c.LocateKey(key) + + return m.String() +}
\ No newline at end of file diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go new file mode 100644 index 000000000..192516092 --- /dev/null +++ b/weed/messaging/broker/consistent_distribution_test.go @@ -0,0 +1,32 @@ +package broker + +import ( + "fmt" + "testing" +) + +func TestPickMember(t *testing.T) { + + servers := []string{ + "s1:port", + "s2:port", + "s3:port", + "s5:port", + "s4:port", + } + + total := 1000 + + distribution := make(map[string]int) + for i:=0;i<total;i++{ + tp := fmt.Sprintf("tp:%2d", i) + m := PickMember(servers, []byte(tp)) + // println(tp, "=>", m) + distribution[m]++ + } + + for member, count := range distribution { + fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers))) + } + +}
\ No newline at end of file diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go index f8a5aa171..f3a66a2f5 100644 --- a/weed/messaging/broker/topic_lock.go +++ b/weed/messaging/broker/topic_lock.go @@ -16,6 +16,13 @@ type TopicPartition struct { Topic string Partition int32 } +const ( + TopicPartitionFmt = "%s/%s_%2d" +) +func (tp *TopicPartition) String() string { + return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) +} + type TopicLock struct { sync.Mutex cond *sync.Cond @@ -101,3 +108,13 @@ func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) { delete(tl.locks, partition) } } + +func (tl *TopicLocks) ListTopicPartitions() (tps []TopicPartition) { + tl.Lock() + defer tl.Unlock() + + for k := range tl.locks { + tps = append(tps, k) + } + return +} diff --git a/weed/messaging/msgclient/pub_sub_chan.go b/weed/messaging/msgclient/pub_sub_chan.go index d39e4c658..a11240080 100644 --- a/weed/messaging/msgclient/pub_sub_chan.go +++ b/weed/messaging/msgclient/pub_sub_chan.go @@ -5,12 +5,15 @@ import ( "log" "time" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) type PubChannel struct { - client messaging_pb.SeaweedMessaging_PublishClient + client messaging_pb.SeaweedMessaging_PublishClient + grpcConnection *grpc.ClientConn } func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { @@ -28,7 +31,8 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { return nil, err } return &PubChannel{ - client: pc, + client: pc, + grpcConnection: grpcConnection, }, nil } @@ -40,7 +44,24 @@ func (pc *PubChannel) Publish(m []byte) error { }) } func (pc *PubChannel) Close() error { - return pc.client.CloseSend() + + // println("send closing") + if err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + IsClose: true, + }, + }); err != nil { + log.Printf("err send close: %v", err) + } + // println("receive closing") + if _, err := pc.client.Recv(); err != nil && err != io.EOF { + log.Printf("err receive close: %v", err) + } + // println("close connection") + if err := pc.grpcConnection.Close(); err != nil { + log.Printf("err connection close: %v", err) + } + return nil } type SubChannel struct { @@ -58,7 +79,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { if err != nil { return nil, err } - sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0,0)) + sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0, 0)) if err != nil { return nil, err } @@ -78,13 +99,14 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { log.Printf("fail to receive from netchan %s: %v", chanName, subErr) return } - if resp.IsClose { + if resp.Data.IsClose { + t.stream.Send(&messaging_pb.SubscriberMessage{ + IsClose: true, + }) close(t.ch) return } - if resp.Data != nil { - t.ch <- resp.Data.Value - } + t.ch <- resp.Data.Value } }() diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go index b0459494b..08f1d278a 100644 --- a/weed/messaging/msgclient/publisher.go +++ b/weed/messaging/msgclient/publisher.go @@ -4,9 +4,9 @@ import ( "context" "github.com/OneOfOne/xxhash" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -16,7 +16,7 @@ type Publisher struct { messageCount uint64 publisherId string } - +/* func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ @@ -24,7 +24,11 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* } publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupPublisherClient(namespace, topic, int32(i)) + client, err := setupPublisherClient(broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + }) if err != nil { return nil, err } @@ -35,6 +39,7 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* topicConfiguration: topicConfiguration, }, nil } +*/ func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index 27fa35a5b..d3066d6ef 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -5,6 +5,7 @@ import ( "io" "time" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -13,6 +14,7 @@ type Subscriber struct { subscriberId string } +/* func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ @@ -36,9 +38,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { - stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime) + stream, err := setupSubscriberClient(subscriberId, namespace, topic, partition, startTime) if err != nil { - return client, err + return stream, err } if newBroker != nil { @@ -47,6 +49,7 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic return stream, nil } +*/ func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) @@ -70,13 +73,10 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, } // process init response - initResponse, err := stream.Recv() + _, err = stream.Recv() if err != nil { return } - if initResponse.Redirect != nil { - // TODO follow redirection - } return stream, nil } |
