diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_configure.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_configure.go | 39 |
1 files changed, 35 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 7f7c8f84b..467ceb81d 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -3,12 +3,16 @@ package broker import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "time" ) // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer @@ -33,7 +37,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) for _, bpa := range ret.BrokerPartitionAssignments { - // fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker) + fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker) if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ Topic: request.Topic, @@ -62,7 +66,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } } - // TODO revert if some error happens in the middle of the assignments + glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, ret.BrokerPartitionAssignments) return ret, err } @@ -73,8 +77,8 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port)) // drain existing topic partition subscriptions - for _, brokerPartition := range request.BrokerPartitionAssignments { - localPartition := topic.FromPbBrokerPartitionAssignment(self, brokerPartition) + for _, assignment := range request.BrokerPartitionAssignments { + localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition)) if request.IsDraining { // TODO drain existing topic partition subscriptions @@ -103,5 +107,32 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } } + glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) return ret, nil } + +func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType { + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) + partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop) + + return func(startTime, stopTime time.Time, buf []byte) { + if len(buf) == 0 { + return + } + + startTime, stopTime = startTime.UTC(), stopTime.UTC() + fileName := startTime.Format(topic.TIME_FORMAT) + + targetFile := fmt.Sprintf("%s/%s",partitionDir, fileName) + + for { + if err := b.appendToFile(targetFile, buf); err != nil { + glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) + time.Sleep(737 * time.Millisecond) + } else { + break + } + } + } +} |
