aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-17 00:15:52 -0800
committerchrislu <chris.lu@gmail.com>2024-01-17 00:15:52 -0800
commitbc8d2a01cc9b451faf8e24d04bbc5931ee487a73 (patch)
tree49e153d73e63bc0738d828c753474e62527b8a77 /weed
parentfdf0ea8e11fd0f93f6ff1a273565c4b7cb115e2c (diff)
downloadseaweedfs-bc8d2a01cc9b451faf8e24d04bbc5931ee487a73.tar.xz
seaweedfs-bc8d2a01cc9b451faf8e24d04bbc5931ee487a73.zip
create local topic partition from config on filer
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/broker/broker_grpc_configure.go6
-rw-r--r--weed/mq/broker/broker_grpc_pub.go51
-rw-r--r--weed/mq/broker/broker_grpc_sub.go41
-rw-r--r--weed/mq/broker/broker_server.go12
4 files changed, 81 insertions, 29 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 83a26446c..dcc621c4c 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -113,7 +113,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} else {
var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
- localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition))
+ localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localPartition)
}
}
@@ -139,7 +139,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
return ret, nil
}
-func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
+func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
@@ -166,7 +166,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Pa
}
}
-func (b *MessageQueueBroker) genLogOnDiskReadFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType {
+func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType {
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index e0e138ef2..e8238a5f7 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -3,10 +3,13 @@ package broker
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
+ jsonpb "google.golang.org/protobuf/encoding/protojson"
"math/rand"
"net"
"sync/atomic"
@@ -54,9 +57,13 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
- response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- return stream.Send(response)
+ localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p)
+ // if not created, return error
+ if err != nil {
+ response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
+ glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
+ return stream.Send(response)
+ }
}
ackInterval = int(initMessage.AckInterval)
stream.Send(response)
@@ -141,6 +148,44 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return nil
}
+func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
+ // load local topic partition from configuration on filer if not found
+ var conf *mq_pb.ConfigureTopicResponse
+ topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+ if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
+ if err != nil {
+ return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err)
+ }
+ // parse into filer conf object
+ conf = &mq_pb.ConfigureTopicResponse{}
+ if err = jsonpb.Unmarshal(data, conf); err != nil {
+ return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err)
+ }
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+
+ // create local topic partition
+ self := b.option.BrokerAddress()
+ var hasCreated bool
+ for _, assignment := range conf.BrokerPartitionAssignments {
+ if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) {
+ localTopicPartition = topic.FromPbBrokerPartitionAssignment(self, p, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ b.localTopicManager.AddTopicPartition(t, localTopicPartition)
+ hasCreated = true
+ break
+ }
+ }
+
+ if !hasCreated {
+ return nil, fmt.Errorf("topic %v partition %v not assigned to broker %v", t, p, self)
+ }
+
+ return localTopicPartition, nil
+}
+
// duplicated from master_grpc_server.go
func findClientAddress(ctx context.Context) string {
// fmt.Printf("FromContext %+v\n", ctx)
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index d6114ad23..2f4af3be9 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -11,7 +11,7 @@ import (
"time"
)
-func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
+func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) {
ctx := stream.Context()
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
@@ -24,28 +24,31 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var localTopicPartition *topic.LocalPartition
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
for localTopicPartition == nil {
- stream.Send(&mq_pb.SubscribeMessageResponse{
- Message: &mq_pb.SubscribeMessageResponse_Ctrl{
- Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
- Error: "not initialized",
+ localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, partition)
+ // if not created, return error
+ if err != nil {
+ stream.Send(&mq_pb.SubscribeMessageResponse{
+ Message: &mq_pb.SubscribeMessageResponse_Ctrl{
+ Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
+ Error: fmt.Sprintf("topic %v partition %v not setup: %v", t, partition, err),
+ },
},
- },
- })
- time.Sleep(337 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
+ })
+ time.Sleep(337 * time.Millisecond)
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return nil
+ }
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return nil
+ default:
+ // Continue processing the request
}
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
- return nil
- default:
- // Continue processing the request
}
- localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 1a2c09ca4..9ef277f4d 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -32,6 +32,10 @@ type MessageQueueBrokerOption struct {
VolumeServerAccess string // how to access volume servers
}
+func (option *MessageQueueBrokerOption) BrokerAddress() pb.ServerAddress {
+ return pb.NewServerAddress(option.Ip, option.Port, 0)
+}
+
type MessageQueueBroker struct {
mq_pb.UnimplementedSeaweedMessagingServer
option *MessageQueueBrokerOption
@@ -55,7 +59,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker = &MessageQueueBroker{
option: option,
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
Balancer: pub_broker_balancer,
@@ -76,13 +80,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
for mqBroker.currentFiler == "" {
time.Sleep(time.Millisecond * 237)
}
- self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
+ self := option.BrokerAddress()
glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
- mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, self)
+ mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self))
for {
- err := mqBroker.BrokerConnectToBalancer(self)
+ err := mqBroker.BrokerConnectToBalancer(string(self))
if err != nil {
fmt.Printf("BrokerConnectToBalancer: %v\n", err)
}