aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-08-28 09:02:12 -0700
committerchrislu <chris.lu@gmail.com>2023-08-28 09:02:12 -0700
commit1eb2da46d5d5a52c1012aa19ef31c1c8ed568d9e (patch)
treed808ee7c62bffa0a86ad908b9b533652f7227ef3 /weed
parent504ae8383ac3a0838d31d04b31623872b5734b31 (diff)
downloadseaweedfs-1eb2da46d5d5a52c1012aa19ef31c1c8ed568d9e.tar.xz
seaweedfs-1eb2da46d5d5a52c1012aa19ef31c1c8ed568d9e.zip
connect and publish
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go2
-rw-r--r--weed/mq/broker/broker_grpc_pub.go19
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go29
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go (renamed from weed/mq/client/sub_client/subscriber.go)0
-rw-r--r--weed/mq/client/pub_client/lookup.go74
-rw-r--r--weed/mq/client/pub_client/publish.go34
-rw-r--r--weed/mq/client/pub_client/publisher.go73
-rw-r--r--weed/mq/client/sub_client/lookup.go74
-rw-r--r--weed/mq/client/sub_client/subscribe.go28
-rw-r--r--weed/mq/topic/local_partition.go19
10 files changed, 294 insertions, 58 deletions
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 4f9eb5182..96448be83 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -17,7 +17,7 @@ import (
// 2.2 if the topic is found, return the brokers
//
// 3. unlock the topic
-func (broker *MessageQueueBroker) FindTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) {
+func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) {
ret := &mq_pb.LookupTopicBrokersResponse{}
// TODO lock the topic
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 6e769b2fa..d8f33c2a5 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -79,16 +79,17 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
}
response := &mq_pb.PublishResponse{}
// TODO check whether current broker should be the leader for the topic partition
- if initMessage := req.GetInit(); initMessage != nil {
- localTopicPartition = broker.localTopicManager.GetTopicPartition(
- topic.FromPbTopic(initMessage.Topic),
- topic.FromPbPartition(initMessage.Partition),
- )
+ initMessage := req.GetInit()
+ if initMessage != nil {
+ t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
+ localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
- response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
- return stream.Send(response)
+ localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
}
+ } else {
+ response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
+ glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
+ return stream.Send(response)
}
// process each published messages
@@ -104,7 +105,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
AckSequence: sequence,
}
if dataMessage := req.GetData(); dataMessage != nil {
- print('+')
+ print("+")
localTopicPartition.Publish(dataMessage)
}
if err := stream.Send(response); err != nil {
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
new file mode 100644
index 000000000..a540143a4
--- /dev/null
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -0,0 +1,29 @@
+package main
+
+import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+)
+
+func main() {
+
+ publisher := pub_client.NewTopicPublisher(
+ "test", "test")
+ if err := publisher.Connect("localhost:17777"); err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ for i := 0; i < 10; i++ {
+ if dataErr := publisher.Publish(
+ []byte(fmt.Sprintf("key-%d", i)),
+ []byte(fmt.Sprintf("value-%d", i)),
+ ); dataErr != nil {
+ fmt.Println(dataErr)
+ return
+ }
+ }
+
+ fmt.Println("done publishing")
+
+}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index b7ae2fe10..b7ae2fe10 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go
new file mode 100644
index 000000000..c54b2687d
--- /dev/null
+++ b/weed/mq/client/pub_client/lookup.go
@@ -0,0 +1,74 @@
+package pub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc"
+)
+
+func (p *TopicPublisher) doLookup(
+ brokerAddress string, grpcDialOption grpc.DialOption) error {
+ err := pb.WithBrokerGrpcClient(true,
+ brokerAddress,
+ grpcDialOption,
+ func(client mq_pb.SeaweedMessagingClient) error {
+ lookupResp, err := client.LookupTopicBrokers(context.Background(),
+ &mq_pb.LookupTopicBrokersRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ IsForPublish: true,
+ })
+ if err != nil {
+ return err
+ }
+ for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
+ // partition => broker
+ p.partition2Broker.Insert(
+ brokerPartitionAssignment.Partition.RangeStart,
+ brokerPartitionAssignment.Partition.RangeStop,
+ brokerPartitionAssignment.LeaderBroker)
+
+ // broker => publish client
+ // send init message
+ // save the publishing client
+ brokerAddress := brokerPartitionAssignment.LeaderBroker
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption)
+ if err != nil {
+ return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ publishClient, err := brokerClient.Publish(context.Background())
+ if err != nil {
+ return fmt.Errorf("create publish client: %v", err)
+ }
+ p.broker2PublishClient.Set(brokerAddress, publishClient)
+ if err = publishClient.Send(&mq_pb.PublishRequest{
+ Message: &mq_pb.PublishRequest_Init{
+ Init: &mq_pb.PublishRequest_InitMessage{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: brokerPartitionAssignment.Partition.RingSize,
+ RangeStart: brokerPartitionAssignment.Partition.RangeStart,
+ RangeStop: brokerPartitionAssignment.Partition.RangeStop,
+ },
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("send init message: %v", err)
+ }
+ }
+ return nil
+ })
+
+ if err != nil {
+ return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
+ }
+ return nil
+}
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
new file mode 100644
index 000000000..0ecb55c9b
--- /dev/null
+++ b/weed/mq/client/pub_client/publish.go
@@ -0,0 +1,34 @@
+package pub_client
+
+import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/broker"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+func (p *TopicPublisher) Publish(key, value []byte) error {
+ hashKey := util.HashToInt32(key) % broker.MaxPartitionCount
+ if hashKey < 0 {
+ hashKey = -hashKey
+ }
+ brokerAddress, found := p.partition2Broker.Floor(hashKey, hashKey)
+ if !found {
+ return fmt.Errorf("no broker found for key %d", hashKey)
+ }
+ publishClient, found := p.broker2PublishClient.Get(brokerAddress)
+ if !found {
+ return fmt.Errorf("no publish client found for broker %s", brokerAddress)
+ }
+ if err := publishClient.Send(&mq_pb.PublishRequest{
+ Message: &mq_pb.PublishRequest_Data{
+ Data: &mq_pb.DataMessage{
+ Key: key,
+ Value: value,
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("send publish request: %v", err)
+ }
+ return nil
+}
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 8be027ac7..171b5ebd7 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -1,59 +1,36 @@
-package main
+package pub_client
import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/rdleal/intervalst/interval"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
-func main() {
-
- err := pb.WithBrokerGrpcClient(true,
- "localhost:17777",
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- func(client mq_pb.SeaweedMessagingClient) error {
- pubClient, err := client.Publish(context.Background())
- if err != nil {
- return err
- }
- if initErr := pubClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: "test",
- Name: "test",
- },
- Partition: &mq_pb.Partition{
- RangeStart: 0,
- RangeStop: 1,
- RingSize: 1,
- },
- },
- },
- }); initErr != nil {
- return initErr
- }
-
- for i := 0; i < 10; i++ {
- if dataErr := pubClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Data{
- Data: &mq_pb.DataMessage{
- Key: []byte(fmt.Sprintf("key-%d", i)),
- Value: []byte(fmt.Sprintf("value-%d", i)),
- },
- },
- }); dataErr != nil {
- return dataErr
- }
- }
- return nil
- })
+type PublisherConfiguration struct {
+}
+type TopicPublisher struct {
+ namespace string
+ topic string
+ partition2Broker *interval.SearchTree[string, int32]
+ broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient]
+}
- if err != nil {
- fmt.Println(err)
+func NewTopicPublisher(namespace, topic string) *TopicPublisher {
+ return &TopicPublisher{
+ namespace: namespace,
+ topic: topic,
+ partition2Broker: interval.NewSearchTree[string](func(a, b int32) int {
+ return int(a - b)
+ }),
+ broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](),
}
+}
+func (p *TopicPublisher) Connect(bootstrapBroker string) error {
+ if err := p.doLookup(bootstrapBroker, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
+ return err
+ }
+ return nil
}
diff --git a/weed/mq/client/sub_client/lookup.go b/weed/mq/client/sub_client/lookup.go
new file mode 100644
index 000000000..89d3d2c45
--- /dev/null
+++ b/weed/mq/client/sub_client/lookup.go
@@ -0,0 +1,74 @@
+package sub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc"
+)
+
+func (p *TopicSubscriber) doLookup(
+ brokerAddress string, grpcDialOption grpc.DialOption) error {
+ err := pb.WithBrokerGrpcClient(true,
+ brokerAddress,
+ grpcDialOption,
+ func(client mq_pb.SeaweedMessagingClient) error {
+ lookupResp, err := client.LookupTopicBrokers(context.Background(),
+ &mq_pb.LookupTopicBrokersRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ IsForPublish: true,
+ })
+ if err != nil {
+ return err
+ }
+ for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
+ // partition => broker
+ p.partition2Broker.Insert(
+ brokerPartitionAssignment.Partition.RangeStart,
+ brokerPartitionAssignment.Partition.RangeStop,
+ brokerPartitionAssignment.LeaderBroker)
+
+ // broker => publish client
+ // send init message
+ // save the publishing client
+ brokerAddress := brokerPartitionAssignment.LeaderBroker
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption)
+ if err != nil {
+ return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ publishClient, err := brokerClient.Publish(context.Background())
+ if err != nil {
+ return fmt.Errorf("create publish client: %v", err)
+ }
+ p.broker2PublishClient.Set(brokerAddress, publishClient)
+ if err = publishClient.Send(&mq_pb.PublishRequest{
+ Message: &mq_pb.PublishRequest_Init{
+ Init: &mq_pb.PublishRequest_InitMessage{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: brokerPartitionAssignment.Partition.RingSize,
+ RangeStart: brokerPartitionAssignment.Partition.RangeStart,
+ RangeStop: brokerPartitionAssignment.Partition.RangeStop,
+ },
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("send init message: %v", err)
+ }
+ }
+ return nil
+ })
+
+ if err != nil {
+ return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
+ }
+ return nil
+}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
new file mode 100644
index 000000000..158c93010
--- /dev/null
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -0,0 +1,28 @@
+package sub_client
+
+import (
+ cmap "github.com/orcaman/concurrent-map"
+ "github.com/rdleal/intervalst/interval"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+type SubscriberConfiguration struct {
+}
+
+type TopicSubscriber struct {
+ namespace string
+ topic string
+ partition2Broker *interval.SearchTree[string, int32]
+ broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient]
+}
+
+func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber {
+ return &TopicSubscriber{
+ namespace: namespace,
+ topic: topic,
+ partition2Broker: interval.NewSearchTree[string](func(a, b int32) int {
+ return int(a - b)
+ }),
+ broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](),
+ }
+}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index a87922d9c..eaedb9f20 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -1,6 +1,7 @@
package topic
import (
+ "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -15,6 +16,24 @@ type LocalPartition struct {
logBuffer *log_buffer.LogBuffer
}
+func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {
+ return &LocalPartition{
+ Partition: partition,
+ isLeader: isLeader,
+ FollowerBrokers: followerBrokers,
+ logBuffer: log_buffer.NewLogBuffer(
+ fmt.Sprintf("%s/%s/%4d-%4d", topic.Namespace, topic.Name, partition.RangeStart, partition.RangeStop),
+ 2*time.Minute,
+ func(startTime, stopTime time.Time, buf []byte) {
+
+ },
+ func() {
+
+ },
+ ),
+ }
+}
+
type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error
func (p LocalPartition) Publish(message *mq_pb.DataMessage) {