aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-18 18:47:34 -0700
committerchrislu <chris.lu@gmail.com>2023-09-18 18:47:34 -0700
commit27af11f1e80d738ed495dfca9766c913fd67a7ca (patch)
treea4a54f1e1211e273bede5ecc39ed9513316c596d /weed
parent0bb97709d41b1be4c74f01dcc65aac6d5f88bd16 (diff)
downloadseaweedfs-27af11f1e80d738ed495dfca9766c913fd67a7ca.tar.xz
seaweedfs-27af11f1e80d738ed495dfca9766c913fd67a7ca.zip
Revert "Revert "Merge branch 'master' into sub""
This reverts commit 0bb97709d41b1be4c74f01dcc65aac6d5f88bd16.
Diffstat (limited to 'weed')
-rw-r--r--weed/cluster/lock_manager/lock_manager.go2
-rw-r--r--weed/mount/dirty_pages_chunked.go7
-rw-r--r--weed/mq/balancer/allocate.go20
-rw-r--r--weed/mq/balancer/allocate_test.go62
-rw-r--r--weed/mq/balancer/balancer.go59
-rw-r--r--weed/mq/balancer/lookup.go43
-rw-r--r--weed/mq/broker/broker_grpc_admin.go4
-rw-r--r--weed/mq/topic_allocation/allocation.go81
-rw-r--r--weed/server/filer_server.go7
-rw-r--r--weed/server/filer_server_handlers.go12
-rw-r--r--weed/server/filer_server_handlers_write.go2
-rw-r--r--weed/storage/store_ec_delete.go4
12 files changed, 199 insertions, 104 deletions
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go
index 0e3e47ba4..49b951dd9 100644
--- a/weed/cluster/lock_manager/lock_manager.go
+++ b/weed/cluster/lock_manager/lock_manager.go
@@ -139,7 +139,7 @@ func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string,
func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
lm.locks.Range(func(k string, lock *Lock) bool {
- if k == key {
+ if k == key && lock != nil {
owner = lock.Owner
return false
}
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
index 76a3be1be..25b071e7d 100644
--- a/weed/mount/dirty_pages_chunked.go
+++ b/weed/mount/dirty_pages_chunked.go
@@ -2,11 +2,12 @@ package mount
import (
"fmt"
+ "io"
+ "sync"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/page_writer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "io"
- "sync"
)
type ChunkedDirtyPages struct {
@@ -82,7 +83,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reade
}
-func (pages ChunkedDirtyPages) Destroy() {
+func (pages *ChunkedDirtyPages) Destroy() {
pages.uploadPipeline.Shutdown()
}
diff --git a/weed/mq/balancer/allocate.go b/weed/mq/balancer/allocate.go
new file mode 100644
index 000000000..d594c60fb
--- /dev/null
+++ b/weed/mq/balancer/allocate.go
@@ -0,0 +1,20 @@
+package balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) {
+ return []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:17777",
+ FollowerBrokers: []string{"localhost:17777"},
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: 0,
+ RangeStop: MaxPartitionCount,
+ },
+ },
+ }
+}
diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/balancer/allocate_test.go
new file mode 100644
index 000000000..c714788e6
--- /dev/null
+++ b/weed/mq/balancer/allocate_test.go
@@ -0,0 +1,62 @@
+package balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "reflect"
+ "testing"
+)
+
+func Test_allocateOneBroker(t *testing.T) {
+ brokers := cmap.New[*BrokerStats]()
+ brokers.SetIfAbsent("localhost:17777", &BrokerStats{
+ TopicPartitionCount: 0,
+ ConsumerCount: 0,
+ CpuUsagePercent: 0,
+ })
+
+ tests := []struct {
+ name string
+ args args
+ wantAssignments []*mq_pb.BrokerPartitionAssignment
+ }{
+ {
+ name: "test only one broker",
+ args: args{
+ brokers: brokers,
+ partitionCount: 6,
+ },
+ wantAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:17777",
+ FollowerBrokers: []string{"localhost:17777"},
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: 0,
+ RangeStop: MaxPartitionCount,
+ },
+ },
+ },
+ },
+ }
+ testThem(t, tests)
+}
+
+type args struct {
+ brokers cmap.ConcurrentMap[string, *BrokerStats]
+ partitionCount int
+}
+
+func testThem(t *testing.T, tests []struct {
+ name string
+ args args
+ wantAssignments []*mq_pb.BrokerPartitionAssignment
+}) {
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) {
+ t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments)
+ }
+ })
+ }
+}
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 74871925f..837dc0ce3 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -1,16 +1,67 @@
package balancer
import (
+ "fmt"
cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+const (
+ MaxPartitionCount = 8 * 9 * 5 * 7 //2520
)
type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats]
}
+
type BrokerStats struct {
TopicPartitionCount int32
ConsumerCount int32
CpuUsagePercent int32
+ Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
+}
+
+func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
+ bs.TopicPartitionCount = int32(len(stats.Stats))
+ bs.CpuUsagePercent = stats.CpuUsagePercent
+
+ var consumerCount int32
+ currentTopicPartitions := bs.Stats.Items()
+ for _, topicPartitionStats := range stats.Stats {
+ tps := &TopicPartitionStats{
+ TopicPartition: TopicPartition{
+ Namespace: topicPartitionStats.Topic.Namespace,
+ Topic: topicPartitionStats.Topic.Name,
+ RangeStart: topicPartitionStats.Partition.RangeStart,
+ RangeStop: topicPartitionStats.Partition.RangeStop,
+ },
+ ConsumerCount: topicPartitionStats.ConsumerCount,
+ IsLeader: topicPartitionStats.IsLeader,
+ }
+ consumerCount += topicPartitionStats.ConsumerCount
+ key := tps.TopicPartition.String()
+ bs.Stats.Set(key, tps)
+ delete(currentTopicPartitions, key)
+ }
+ // remove the topic partitions that are not in the stats
+ for key := range currentTopicPartitions {
+ bs.Stats.Remove(key)
+ }
+ bs.ConsumerCount = consumerCount
+
+}
+
+type TopicPartition struct {
+ Namespace string
+ Topic string
+ RangeStart int32
+ RangeStop int32
+}
+
+type TopicPartitionStats struct {
+ TopicPartition
+ ConsumerCount int32
+ IsLeader bool
}
func NewBalancer() *Balancer {
@@ -20,5 +71,11 @@ func NewBalancer() *Balancer {
}
func NewBrokerStats() *BrokerStats {
- return &BrokerStats{}
+ return &BrokerStats{
+ Stats: cmap.New[*TopicPartitionStats](),
+ }
+}
+
+func (tp *TopicPartition) String() string {
+ return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
}
diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go
new file mode 100644
index 000000000..55ed3e95d
--- /dev/null
+++ b/weed/mq/balancer/lookup.go
@@ -0,0 +1,43 @@
+package balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
+ // find existing topic partition assignments
+ for brokerStatsItem := range b.Brokers.IterBuffered() {
+ broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
+ for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
+ topicPartitionStat := topicPartitionStatsItem.Val
+ if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
+ topicPartitionStat.TopicPartition.Topic == topic.Name {
+ assignment := &mq_pb.BrokerPartitionAssignment{
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: topicPartitionStat.RangeStart,
+ RangeStop: topicPartitionStat.RangeStop,
+ },
+ }
+ if topicPartitionStat.IsLeader {
+ assignment.LeaderBroker = broker
+ } else {
+ assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker)
+ }
+ assignments = append(assignments, assignment)
+ }
+ }
+ }
+ if len(assignments) > 0 {
+ return assignments, nil
+ }
+
+ // find the topic partitions on the filer
+ // if the topic is not found
+ // if the request is_for_publish
+ // create the topic
+ // if the request is_for_subscribe
+ // return error not found
+ // t := topic.FromPbTopic(request.Topic)
+ return allocateTopicPartitions(b.Brokers, 6), nil
+}
diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go
index 7337ba23e..b24bf08a4 100644
--- a/weed/mq/broker/broker_grpc_admin.go
+++ b/weed/mq/broker/broker_grpc_admin.go
@@ -12,10 +12,6 @@ import (
"sync"
)
-const (
- MaxPartitionCount = 1024
-)
-
func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
ret := &mq_pb.FindBrokerLeaderResponse{}
err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go
deleted file mode 100644
index a07ce4884..000000000
--- a/weed/mq/topic_allocation/allocation.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package topic_allocation
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "modernc.org/mathutil"
-)
-
-const (
- DefaultBrokerCount = 4
-)
-
-// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
-func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
- // create a previous assignment if not exists
- if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
- prevAssignment = &mq_pb.TopicPartitionsAssignment{
- PartitionCount: topic.PartitionCount,
- }
- partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
- for i := 0; i < DefaultBrokerCount; i++ {
- prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
- PartitionStart: int32(i * partitionCountForEachBroker),
- PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
- })
- }
- }
-
- // create a new assignment
- assignment = &mq_pb.TopicPartitionsAssignment{
- PartitionCount: prevAssignment.PartitionCount,
- }
-
- // allocate partitions for each partition range
- for _, brokerPartition := range prevAssignment.BrokerPartitions {
- // allocate partitions for each partition range
- leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
- if err != nil {
- return nil, err
- }
-
- followerBrokers := make([]string, len(followers))
- for i, follower := range followers {
- followerBrokers[i] = string(follower)
- }
-
- assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
- PartitionStart: brokerPartition.PartitionStart,
- PartitionStop: brokerPartition.PartitionStop,
- LeaderBroker: string(leader),
- FollowerBrokers: followerBrokers,
- })
- }
-
- return
-}
-
-func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
- // allocate leader
- leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
- if err != nil {
- return
- }
-
- // allocate followers
- followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
- if err != nil {
- return
- }
-
- return
-}
-
-func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
- return
-}
-
-func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
- return
-}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 98784bce3..788ee150a 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -94,9 +94,6 @@ type FilerServer struct {
// track known metadata listeners
knownListenersLock sync.Mutex
knownListeners map[int32]int32
-
- // client to assign file id
- assignProxy *operation.AssignProxy
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
@@ -135,8 +132,6 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepMasterClientConnected()
- fs.assignProxy, err = operation.NewAssignProxy(fs.filer.GetMaster, fs.grpcDialOption, 16)
-
if !util.LoadConfiguration("filer", false) {
v.SetDefault("leveldb2.enabled", true)
v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
@@ -189,7 +184,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
- return fs, err
+ return fs, nil
}
func (fs *FilerServer) checkWithMaster() {
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 454148e66..54ddfb8b2 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -30,12 +30,6 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
return
}
- isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
- if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
- writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
- return
- }
-
// proxy to volume servers
var fileId string
if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
@@ -48,6 +42,12 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
return
}
+ isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
+ if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
defer func() {
stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index daf63fa8d..898975d14 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -43,7 +43,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
ar, altRequest := so.ToAssignRequests(1)
- assignResult, ae := fs.assignProxy.Assign(ar, altRequest)
+ assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
err = ae
diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go
index 4b24cae79..a3e028bbb 100644
--- a/weed/storage/store_ec_delete.go
+++ b/weed/storage/store_ec_delete.go
@@ -36,7 +36,9 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle
func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
_, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
-
+ if err != nil {
+ return err
+ }
if len(intervals) == 0 {
return erasure_coding.NotFoundError
}