diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/cluster/lock_manager/lock_manager.go | 2 | ||||
| -rw-r--r-- | weed/mount/dirty_pages_chunked.go | 7 | ||||
| -rw-r--r-- | weed/mq/balancer/allocate.go | 20 | ||||
| -rw-r--r-- | weed/mq/balancer/allocate_test.go | 62 | ||||
| -rw-r--r-- | weed/mq/balancer/balancer.go | 59 | ||||
| -rw-r--r-- | weed/mq/balancer/lookup.go | 43 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_admin.go | 4 | ||||
| -rw-r--r-- | weed/mq/topic_allocation/allocation.go | 81 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 7 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 2 | ||||
| -rw-r--r-- | weed/storage/store_ec_delete.go | 4 |
12 files changed, 199 insertions, 104 deletions
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go index 0e3e47ba4..49b951dd9 100644 --- a/weed/cluster/lock_manager/lock_manager.go +++ b/weed/cluster/lock_manager/lock_manager.go @@ -139,7 +139,7 @@ func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, func (lm *LockManager) GetLockOwner(key string) (owner string, err error) { lm.locks.Range(func(k string, lock *Lock) bool { - if k == key { + if k == key && lock != nil { owner = lock.Owner return false } diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index 76a3be1be..25b071e7d 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -2,11 +2,12 @@ package mount import ( "fmt" + "io" + "sync" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mount/page_writer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "io" - "sync" ) type ChunkedDirtyPages struct { @@ -82,7 +83,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reade } -func (pages ChunkedDirtyPages) Destroy() { +func (pages *ChunkedDirtyPages) Destroy() { pages.uploadPipeline.Shutdown() } diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go new file mode 100644 index 000000000..d594c60fb --- /dev/null +++ b/weed/mq/balancer/allocate.go @@ -0,0 +1,20 @@ +package balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) { + return []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + } +} diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go new file mode 100644 index 000000000..c714788e6 --- /dev/null +++ b/weed/mq/balancer/allocate_test.go @@ -0,0 +1,62 @@ +package balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "reflect" + "testing" +) + +func Test_allocateOneBroker(t *testing.T) { + brokers := cmap.New[*BrokerStats]() + brokers.SetIfAbsent("localhost:17777", &BrokerStats{ + TopicPartitionCount: 0, + ConsumerCount: 0, + CpuUsagePercent: 0, + }) + + tests := []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment + }{ + { + name: "test only one broker", + args: args{ + brokers: brokers, + partitionCount: 6, + }, + wantAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + }, + }, + } + testThem(t, tests) +} + +type args struct { + brokers cmap.ConcurrentMap[string, *BrokerStats] + partitionCount int +} + +func testThem(t *testing.T, tests []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment +}) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) { + t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments) + } + }) + } +} diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 74871925f..837dc0ce3 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -1,16 +1,67 @@ package balancer import ( + "fmt" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +const ( + MaxPartitionCount = 8 * 9 * 5 * 7 //2520 ) type Balancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] } + type BrokerStats struct { TopicPartitionCount int32 ConsumerCount int32 CpuUsagePercent int32 + Stats cmap.ConcurrentMap[string, *TopicPartitionStats] +} + +func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { + bs.TopicPartitionCount = int32(len(stats.Stats)) + bs.CpuUsagePercent = stats.CpuUsagePercent + + var consumerCount int32 + currentTopicPartitions := bs.Stats.Items() + for _, topicPartitionStats := range stats.Stats { + tps := &TopicPartitionStats{ + TopicPartition: TopicPartition{ + Namespace: topicPartitionStats.Topic.Namespace, + Topic: topicPartitionStats.Topic.Name, + RangeStart: topicPartitionStats.Partition.RangeStart, + RangeStop: topicPartitionStats.Partition.RangeStop, + }, + ConsumerCount: topicPartitionStats.ConsumerCount, + IsLeader: topicPartitionStats.IsLeader, + } + consumerCount += topicPartitionStats.ConsumerCount + key := tps.TopicPartition.String() + bs.Stats.Set(key, tps) + delete(currentTopicPartitions, key) + } + // remove the topic partitions that are not in the stats + for key := range currentTopicPartitions { + bs.Stats.Remove(key) + } + bs.ConsumerCount = consumerCount + +} + +type TopicPartition struct { + Namespace string + Topic string + RangeStart int32 + RangeStop int32 +} + +type TopicPartitionStats struct { + TopicPartition + ConsumerCount int32 + IsLeader bool } func NewBalancer() *Balancer { @@ -20,5 +71,11 @@ func NewBalancer() *Balancer { } func NewBrokerStats() *BrokerStats { - return &BrokerStats{} + return &BrokerStats{ + Stats: cmap.New[*TopicPartitionStats](), + } +} + +func (tp *TopicPartition) String() string { + return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop) } diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go new file mode 100644 index 000000000..55ed3e95d --- /dev/null +++ b/weed/mq/balancer/lookup.go @@ -0,0 +1,43 @@ +package balancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { + // find existing topic partition assignments + for brokerStatsItem := range b.Brokers.IterBuffered() { + broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val + for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { + topicPartitionStat := topicPartitionStatsItem.Val + if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && + topicPartitionStat.TopicPartition.Topic == topic.Name { + assignment := &mq_pb.BrokerPartitionAssignment{ + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: topicPartitionStat.RangeStart, + RangeStop: topicPartitionStat.RangeStop, + }, + } + if topicPartitionStat.IsLeader { + assignment.LeaderBroker = broker + } else { + assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker) + } + assignments = append(assignments, assignment) + } + } + } + if len(assignments) > 0 { + return assignments, nil + } + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + // t := topic.FromPbTopic(request.Topic) + return allocateTopicPartitions(b.Brokers, 6), nil +} diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go index 7337ba23e..b24bf08a4 100644 --- a/weed/mq/broker/broker_grpc_admin.go +++ b/weed/mq/broker/broker_grpc_admin.go @@ -12,10 +12,6 @@ import ( "sync" ) -const ( - MaxPartitionCount = 1024 -) - func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { ret := &mq_pb.FindBrokerLeaderResponse{} err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go deleted file mode 100644 index a07ce4884..000000000 --- a/weed/mq/topic_allocation/allocation.go +++ /dev/null @@ -1,81 +0,0 @@ -package topic_allocation - -import ( - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "modernc.org/mathutil" -) - -const ( - DefaultBrokerCount = 4 -) - -// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions -func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) { - // create a previous assignment if not exists - if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 { - prevAssignment = &mq_pb.TopicPartitionsAssignment{ - PartitionCount: topic.PartitionCount, - } - partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount - for i := 0; i < DefaultBrokerCount; i++ { - prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ - PartitionStart: int32(i * partitionCountForEachBroker), - PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount), - }) - } - } - - // create a new assignment - assignment = &mq_pb.TopicPartitionsAssignment{ - PartitionCount: prevAssignment.PartitionCount, - } - - // allocate partitions for each partition range - for _, brokerPartition := range prevAssignment.BrokerPartitions { - // allocate partitions for each partition range - leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers) - if err != nil { - return nil, err - } - - followerBrokers := make([]string, len(followers)) - for i, follower := range followers { - followerBrokers[i] = string(follower) - } - - assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ - PartitionStart: brokerPartition.PartitionStart, - PartitionStop: brokerPartition.PartitionStop, - LeaderBroker: string(leader), - FollowerBrokers: followerBrokers, - }) - } - - return -} - -func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) { - // allocate leader - leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers) - if err != nil { - return - } - - // allocate followers - followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers) - if err != nil { - return - } - - return -} - -func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) { - return -} - -func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) { - return -} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 98784bce3..788ee150a 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -94,9 +94,6 @@ type FilerServer struct { // track known metadata listeners knownListenersLock sync.Mutex knownListeners map[int32]int32 - - // client to assign file id - assignProxy *operation.AssignProxy } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { @@ -135,8 +132,6 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepMasterClientConnected() - fs.assignProxy, err = operation.NewAssignProxy(fs.filer.GetMaster, fs.grpcDialOption, 16) - if !util.LoadConfiguration("filer", false) { v.SetDefault("leveldb2.enabled", true) v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) @@ -189,7 +184,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot) - return fs, err + return fs, nil } func (fs *FilerServer) checkWithMaster() { diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 454148e66..54ddfb8b2 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -30,12 +30,6 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { return } - isReadHttpCall := r.Method == "GET" || r.Method == "HEAD" - if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) { - writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) - return - } - // proxy to volume servers var fileId string if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") { @@ -48,6 +42,12 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { return } + isReadHttpCall := r.Method == "GET" || r.Method == "HEAD" + if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + stats.FilerRequestCounter.WithLabelValues(r.Method).Inc() defer func() { stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds()) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index daf63fa8d..898975d14 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -43,7 +43,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u ar, altRequest := so.ToAssignRequests(1) - assignResult, ae := fs.assignProxy.Assign(ar, altRequest) + assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) err = ae diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index 4b24cae79..a3e028bbb 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -36,7 +36,9 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version) - + if err != nil { + return err + } if len(intervals) == 0 { return erasure_coding.NotFoundError } |
