aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go87
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go1
-rw-r--r--weed/messaging/broker/broker_server.go76
-rw-r--r--weed/messaging/client/client.go30
-rw-r--r--weed/messaging/msgclient/client.go56
-rw-r--r--weed/messaging/msgclient/pub_sub_chan.go96
-rw-r--r--weed/messaging/msgclient/publisher.go (renamed from weed/messaging/client/publisher.go)14
-rw-r--r--weed/messaging/msgclient/subscriber.go (renamed from weed/messaging/client/subscriber.go)25
8 files changed, 289 insertions, 96 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
new file mode 100644
index 000000000..4b7f357fa
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -0,0 +1,87 @@
+package broker
+
+import (
+ "context"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+/*
+Topic discovery:
+
+When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
+
+The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
+Otherwise, just host the topic.
+
+So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
+If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
+
+*/
+
+func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
+
+ panic("implement me")
+}
+
+
+
+func (broker *MessageBroker) checkPeers() {
+
+ // contact a filer about masters
+ var masters []string
+ found := false
+ for !found {
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received master list: %s", masters)
+
+ // contact each masters for filers
+ var filers []string
+ found = false
+ for !found {
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to list filers: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received filer list: %s", filers)
+
+ broker.option.Filers = filers
+
+}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 379063eed..472a5007b 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -38,7 +38,6 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
}
if err = stream.Send(&messaging_pb.BrokerMessage{
- Redirect: nil,
}); err != nil {
return err
}
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 29c227274..9cad27214 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -16,6 +16,7 @@ type MessageBrokerOption struct {
Filers []string
DefaultReplication string
MaxMB int
+ Ip string
Port int
Cipher bool
}
@@ -37,73 +38,44 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
messageBroker.checkPeers()
- // go messageBroker.loopForEver()
+ go messageBroker.keepConnectedToOneFiler()
return messageBroker, nil
}
-func (broker *MessageBroker) loopForEver() {
+func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
- broker.checkPeers()
- time.Sleep(3 * time.Second)
- }
-
-}
-
-func (broker *MessageBroker) checkPeers() {
-
- // contact a filer about masters
- var masters []string
- found := false
- for !found {
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.KeepConnected(context.Background())
if err != nil {
+ glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
- masters = append(masters, resp.Masters...)
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received master list: %s", masters)
-
- // contact each masters for filers
- var filers []string
- found = false
- for !found {
- for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
- ClientType: "filer",
- })
- if err != nil {
- return err
+ glog.V(0).Infof("conntected with filer: %v", filer)
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("send heartbeat")
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("received reply")
+ time.Sleep(11*time.Second)
+ // println("woke up")
}
-
- filers = append(filers, resp.GrpcAddresses...)
-
return nil
})
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to list filers: %v", err)
- time.Sleep(time.Second)
+ time.Sleep(3*time.Second)
}
}
- glog.V(0).Infof("received filer list: %s", filers)
-
- broker.option.Filers = filers
}
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go
deleted file mode 100644
index 4a674a9fc..000000000
--- a/weed/messaging/client/client.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package client
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnection *grpc.ClientConn
-}
-
-func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client")
-
- grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption)
- if err != nil {
- return nil, err
- }
-
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnection: grpcConnection,
- }, nil
-}
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go
new file mode 100644
index 000000000..f4e11232e
--- /dev/null
+++ b/weed/messaging/msgclient/client.go
@@ -0,0 +1,56 @@
+package msgclient
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessagingClient struct {
+ bootstrapBrokers []string
+ grpcConnections map[broker.TopicPartition]*grpc.ClientConn
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
+ return &MessagingClient{
+ bootstrapBrokers: bootstrapBrokers,
+ grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
+ }
+}
+
+
+func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
+
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
+ &messaging_pb.FindBrokerRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Parition: tp.Partition,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ targetBroker := resp.Broker
+ return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
+ }
+ return nil, fmt.Errorf("no broker found for %+v", tp)
+}
diff --git a/weed/messaging/msgclient/pub_sub_chan.go b/weed/messaging/msgclient/pub_sub_chan.go
new file mode 100644
index 000000000..d39e4c658
--- /dev/null
+++ b/weed/messaging/msgclient/pub_sub_chan.go
@@ -0,0 +1,96 @@
+package msgclient
+
+import (
+ "io"
+ "log"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type PubChannel struct {
+ client messaging_pb.SeaweedMessaging_PublishClient
+}
+
+func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ pc, err := setupPublisherClient(grpcConnection, tp)
+ if err != nil {
+ return nil, err
+ }
+ return &PubChannel{
+ client: pc,
+ }, nil
+}
+
+func (pc *PubChannel) Publish(m []byte) error {
+ return pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ Value: m,
+ },
+ })
+}
+func (pc *PubChannel) Close() error {
+ return pc.client.CloseSend()
+}
+
+type SubChannel struct {
+ ch chan []byte
+ stream messaging_pb.SeaweedMessaging_SubscribeClient
+}
+
+func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0,0))
+ if err != nil {
+ return nil, err
+ }
+
+ t := &SubChannel{
+ ch: make(chan []byte),
+ stream: sc,
+ }
+
+ go func() {
+ for {
+ resp, subErr := t.stream.Recv()
+ if subErr == io.EOF {
+ return
+ }
+ if subErr != nil {
+ log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
+ return
+ }
+ if resp.IsClose {
+ close(t.ch)
+ return
+ }
+ if resp.Data != nil {
+ t.ch <- resp.Data.Value
+ }
+ }
+ }()
+
+ return t, nil
+}
+
+func (sc *SubChannel) Channel() chan []byte {
+ return sc.ch
+}
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/msgclient/publisher.go
index 68e5729c1..b0459494b 100644
--- a/weed/messaging/client/publisher.go
+++ b/weed/messaging/msgclient/publisher.go
@@ -1,10 +1,12 @@
-package client
+package msgclient
import (
"context"
"github.com/OneOfOne/xxhash"
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -34,9 +36,9 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*
}, nil
}
-func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) {
+func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
- stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background())
+ stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
if err != nil {
return nil, err
}
@@ -44,9 +46,9 @@ func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partiti
// send init message
err = stream.Send(&messaging_pb.PublishRequest{
Init: &messaging_pb.PublishRequest_InitMessage{
- Namespace: namespace,
- Topic: topic,
- Partition: partition,
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
},
})
if err != nil {
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/msgclient/subscriber.go
index 53e7ffc7d..27fa35a5b 100644
--- a/weed/messaging/client/subscriber.go
+++ b/weed/messaging/msgclient/subscriber.go
@@ -1,4 +1,4 @@
-package client
+package msgclient
import (
"context"
@@ -36,9 +36,22 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
- stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
+ stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime)
if err != nil {
- return nil, err
+ return client, err
+ }
+ if newBroker != nil {
+
+ }
+
+ return stream, nil
+
+}
+
+func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
+ stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
+ if err != nil {
+ return
}
// send init message
@@ -53,20 +66,18 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic
},
})
if err != nil {
- return nil, err
+ return
}
// process init response
initResponse, err := stream.Recv()
if err != nil {
- return nil, err
+ return
}
if initResponse.Redirect != nil {
// TODO follow redirection
}
-
return stream, nil
-
}
func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {