diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/cluster/lock_manager/lock_manager.go | 2 | ||||
| -rw-r--r-- | weed/mount/dirty_pages_chunked.go | 7 | ||||
| -rw-r--r-- | weed/mq/balancer/allocate.go | 20 | ||||
| -rw-r--r-- | weed/mq/balancer/allocate_test.go | 62 | ||||
| -rw-r--r-- | weed/mq/balancer/balancer.go | 59 | ||||
| -rw-r--r-- | weed/mq/balancer/lookup.go | 43 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_admin.go | 4 | ||||
| -rw-r--r-- | weed/mq/topic_allocation/allocation.go | 81 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 7 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 2 | ||||
| -rw-r--r-- | weed/storage/store_ec_delete.go | 4 |
12 files changed, 104 insertions, 199 deletions
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go index 49b951dd9..0e3e47ba4 100644 --- a/weed/cluster/lock_manager/lock_manager.go +++ b/weed/cluster/lock_manager/lock_manager.go @@ -139,7 +139,7 @@ func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, func (lm *LockManager) GetLockOwner(key string) (owner string, err error) { lm.locks.Range(func(k string, lock *Lock) bool { - if k == key && lock != nil { + if k == key { owner = lock.Owner return false } diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index 25b071e7d..76a3be1be 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -2,12 +2,11 @@ package mount import ( "fmt" - "io" - "sync" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mount/page_writer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "io" + "sync" ) type ChunkedDirtyPages struct { @@ -83,7 +82,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reade } -func (pages *ChunkedDirtyPages) Destroy() { +func (pages ChunkedDirtyPages) Destroy() { pages.uploadPipeline.Shutdown() } diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go deleted file mode 100644 index d594c60fb..000000000 --- a/weed/mq/balancer/allocate.go +++ /dev/null @@ -1,20 +0,0 @@ -package balancer - -import ( - cmap "github.com/orcaman/concurrent-map/v2" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" -) - -func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) { - return []*mq_pb.BrokerPartitionAssignment{ - { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, - Partition: &mq_pb.Partition{ - RingSize: MaxPartitionCount, - RangeStart: 0, - RangeStop: MaxPartitionCount, - }, - }, - } -} diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go deleted file mode 100644 index c714788e6..000000000 --- a/weed/mq/balancer/allocate_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package balancer - -import ( - cmap "github.com/orcaman/concurrent-map/v2" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "reflect" - "testing" -) - -func Test_allocateOneBroker(t *testing.T) { - brokers := cmap.New[*BrokerStats]() - brokers.SetIfAbsent("localhost:17777", &BrokerStats{ - TopicPartitionCount: 0, - ConsumerCount: 0, - CpuUsagePercent: 0, - }) - - tests := []struct { - name string - args args - wantAssignments []*mq_pb.BrokerPartitionAssignment - }{ - { - name: "test only one broker", - args: args{ - brokers: brokers, - partitionCount: 6, - }, - wantAssignments: []*mq_pb.BrokerPartitionAssignment{ - { - LeaderBroker: "localhost:17777", - FollowerBrokers: []string{"localhost:17777"}, - Partition: &mq_pb.Partition{ - RingSize: MaxPartitionCount, - RangeStart: 0, - RangeStop: MaxPartitionCount, - }, - }, - }, - }, - } - testThem(t, tests) -} - -type args struct { - brokers cmap.ConcurrentMap[string, *BrokerStats] - partitionCount int -} - -func testThem(t *testing.T, tests []struct { - name string - args args - wantAssignments []*mq_pb.BrokerPartitionAssignment -}) { - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) { - t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments) - } - }) - } -} diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 837dc0ce3..74871925f 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -1,67 +1,16 @@ package balancer import ( - "fmt" cmap "github.com/orcaman/concurrent-map/v2" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" -) - -const ( - MaxPartitionCount = 8 * 9 * 5 * 7 //2520 ) type Balancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] } - type BrokerStats struct { TopicPartitionCount int32 ConsumerCount int32 CpuUsagePercent int32 - Stats cmap.ConcurrentMap[string, *TopicPartitionStats] -} - -func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { - bs.TopicPartitionCount = int32(len(stats.Stats)) - bs.CpuUsagePercent = stats.CpuUsagePercent - - var consumerCount int32 - currentTopicPartitions := bs.Stats.Items() - for _, topicPartitionStats := range stats.Stats { - tps := &TopicPartitionStats{ - TopicPartition: TopicPartition{ - Namespace: topicPartitionStats.Topic.Namespace, - Topic: topicPartitionStats.Topic.Name, - RangeStart: topicPartitionStats.Partition.RangeStart, - RangeStop: topicPartitionStats.Partition.RangeStop, - }, - ConsumerCount: topicPartitionStats.ConsumerCount, - IsLeader: topicPartitionStats.IsLeader, - } - consumerCount += topicPartitionStats.ConsumerCount - key := tps.TopicPartition.String() - bs.Stats.Set(key, tps) - delete(currentTopicPartitions, key) - } - // remove the topic partitions that are not in the stats - for key := range currentTopicPartitions { - bs.Stats.Remove(key) - } - bs.ConsumerCount = consumerCount - -} - -type TopicPartition struct { - Namespace string - Topic string - RangeStart int32 - RangeStop int32 -} - -type TopicPartitionStats struct { - TopicPartition - ConsumerCount int32 - IsLeader bool } func NewBalancer() *Balancer { @@ -71,11 +20,5 @@ func NewBalancer() *Balancer { } func NewBrokerStats() *BrokerStats { - return &BrokerStats{ - Stats: cmap.New[*TopicPartitionStats](), - } -} - -func (tp *TopicPartition) String() string { - return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop) + return &BrokerStats{} } diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go deleted file mode 100644 index 55ed3e95d..000000000 --- a/weed/mq/balancer/lookup.go +++ /dev/null @@ -1,43 +0,0 @@ -package balancer - -import ( - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" -) - -func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { - // find existing topic partition assignments - for brokerStatsItem := range b.Brokers.IterBuffered() { - broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val - for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { - topicPartitionStat := topicPartitionStatsItem.Val - if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && - topicPartitionStat.TopicPartition.Topic == topic.Name { - assignment := &mq_pb.BrokerPartitionAssignment{ - Partition: &mq_pb.Partition{ - RingSize: MaxPartitionCount, - RangeStart: topicPartitionStat.RangeStart, - RangeStop: topicPartitionStat.RangeStop, - }, - } - if topicPartitionStat.IsLeader { - assignment.LeaderBroker = broker - } else { - assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker) - } - assignments = append(assignments, assignment) - } - } - } - if len(assignments) > 0 { - return assignments, nil - } - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - // t := topic.FromPbTopic(request.Topic) - return allocateTopicPartitions(b.Brokers, 6), nil -} diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go index b24bf08a4..7337ba23e 100644 --- a/weed/mq/broker/broker_grpc_admin.go +++ b/weed/mq/broker/broker_grpc_admin.go @@ -12,6 +12,10 @@ import ( "sync" ) +const ( + MaxPartitionCount = 1024 +) + func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { ret := &mq_pb.FindBrokerLeaderResponse{} err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go new file mode 100644 index 000000000..a07ce4884 --- /dev/null +++ b/weed/mq/topic_allocation/allocation.go @@ -0,0 +1,81 @@ +package topic_allocation + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "modernc.org/mathutil" +) + +const ( + DefaultBrokerCount = 4 +) + +// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions +func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) { + // create a previous assignment if not exists + if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 { + prevAssignment = &mq_pb.TopicPartitionsAssignment{ + PartitionCount: topic.PartitionCount, + } + partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount + for i := 0; i < DefaultBrokerCount; i++ { + prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ + PartitionStart: int32(i * partitionCountForEachBroker), + PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount), + }) + } + } + + // create a new assignment + assignment = &mq_pb.TopicPartitionsAssignment{ + PartitionCount: prevAssignment.PartitionCount, + } + + // allocate partitions for each partition range + for _, brokerPartition := range prevAssignment.BrokerPartitions { + // allocate partitions for each partition range + leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers) + if err != nil { + return nil, err + } + + followerBrokers := make([]string, len(followers)) + for i, follower := range followers { + followerBrokers[i] = string(follower) + } + + assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ + PartitionStart: brokerPartition.PartitionStart, + PartitionStop: brokerPartition.PartitionStop, + LeaderBroker: string(leader), + FollowerBrokers: followerBrokers, + }) + } + + return +} + +func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) { + // allocate leader + leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers) + if err != nil { + return + } + + // allocate followers + followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers) + if err != nil { + return + } + + return +} + +func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) { + return +} + +func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) { + return +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 788ee150a..98784bce3 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -94,6 +94,9 @@ type FilerServer struct { // track known metadata listeners knownListenersLock sync.Mutex knownListeners map[int32]int32 + + // client to assign file id + assignProxy *operation.AssignProxy } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { @@ -132,6 +135,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepMasterClientConnected() + fs.assignProxy, err = operation.NewAssignProxy(fs.filer.GetMaster, fs.grpcDialOption, 16) + if !util.LoadConfiguration("filer", false) { v.SetDefault("leveldb2.enabled", true) v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) @@ -184,7 +189,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot) - return fs, nil + return fs, err } func (fs *FilerServer) checkWithMaster() { diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 54ddfb8b2..454148e66 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -30,6 +30,12 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { return } + isReadHttpCall := r.Method == "GET" || r.Method == "HEAD" + if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + // proxy to volume servers var fileId string if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") { @@ -42,12 +48,6 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { return } - isReadHttpCall := r.Method == "GET" || r.Method == "HEAD" - if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) { - writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) - return - } - stats.FilerRequestCounter.WithLabelValues(r.Method).Inc() defer func() { stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds()) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 898975d14..daf63fa8d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -43,7 +43,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u ar, altRequest := so.ToAssignRequests(1) - assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) + assignResult, ae := fs.assignProxy.Assign(ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) err = ae diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index a3e028bbb..4b24cae79 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -36,9 +36,7 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version) - if err != nil { - return err - } + if len(intervals) == 0 { return erasure_coding.NotFoundError } |
