aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_configure.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_configure.go')
-rw-r--r--weed/mq/broker/broker_grpc_configure.go39
1 files changed, 35 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 7f7c8f84b..467ceb81d 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -3,12 +3,16 @@ package broker
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "time"
)
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
@@ -33,7 +37,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
for _, bpa := range ret.BrokerPartitionAssignments {
- // fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker)
+ fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker)
if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
_, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
Topic: request.Topic,
@@ -62,7 +66,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
}
- // TODO revert if some error happens in the middle of the assignments
+ glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, ret.BrokerPartitionAssignments)
return ret, err
}
@@ -73,8 +77,8 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
// drain existing topic partition subscriptions
- for _, brokerPartition := range request.BrokerPartitionAssignments {
- localPartition := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
+ for _, assignment := range request.BrokerPartitionAssignments {
+ localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition))
if request.IsDraining {
// TODO drain existing topic partition subscriptions
@@ -103,5 +107,32 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
}
}
+ glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil
}
+
+func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
+ topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+ partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
+ partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
+
+ return func(startTime, stopTime time.Time, buf []byte) {
+ if len(buf) == 0 {
+ return
+ }
+
+ startTime, stopTime = startTime.UTC(), stopTime.UTC()
+ fileName := startTime.Format(topic.TIME_FORMAT)
+
+ targetFile := fmt.Sprintf("%s/%s",partitionDir, fileName)
+
+ for {
+ if err := b.appendToFile(targetFile, buf); err != nil {
+ glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
+ time.Sleep(737 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ }
+}