aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker')
-rw-r--r--weed/messaging/broker/broker_append.go4
-rw-r--r--weed/messaging/broker/broker_grpc_server.go22
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go116
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go56
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go28
-rw-r--r--weed/messaging/broker/broker_server.go96
-rw-r--r--weed/messaging/broker/consistent_distribution.go38
-rw-r--r--weed/messaging/broker/consistent_distribution_test.go32
-rw-r--r--weed/messaging/broker/subscription.go82
-rw-r--r--weed/messaging/broker/topic_lock.go103
-rw-r--r--weed/messaging/broker/topic_manager.go124
11 files changed, 516 insertions, 185 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index e87e197b0..80f107e00 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -3,6 +3,7 @@ package broker
import (
"context"
"fmt"
+ "io"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -94,6 +95,9 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
for _, filer := range broker.option.Filers {
if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ if err == io.EOF {
+ return
+ }
glog.V(0).Infof("fail to connect to %s: %v", filer, err)
} else {
break
diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go
index 447620a6b..6918a28a6 100644
--- a/weed/messaging/broker/broker_grpc_server.go
+++ b/weed/messaging/broker/broker_grpc_server.go
@@ -2,7 +2,10 @@ package broker
import (
"context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -10,6 +13,25 @@ func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messagin
panic("implement me")
}
+func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
+ resp := &messaging_pb.DeleteTopicResponse{}
+ dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
+ if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
+ return nil, err
+ } else if exists {
+ err = filer_pb.Remove(broker, dir, entry, true, true, true)
+ }
+ return resp, nil
+}
+
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
panic("implement me")
}
+
+func genTopicDir(namespace, topic string) string {
+ return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace, topic)
+}
+
+func genTopicDirEntry(namespace, topic string) (dir, entry string) {
+ return fmt.Sprintf("%s/%s", filer2.TopicsDir, namespace), topic
+}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
new file mode 100644
index 000000000..3c14f3220
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -0,0 +1,116 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+/*
+Topic discovery:
+
+When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
+
+The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
+Otherwise, just host the topic.
+
+So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
+If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
+
+*/
+
+func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
+
+ t := &messaging_pb.FindBrokerResponse{}
+ var peers []string
+
+ targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
+
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
+ Resource: targetTopicPartition,
+ })
+ if err != nil {
+ return err
+ }
+ if resp.Found && len(resp.Resources) > 0 {
+ t.Broker = resp.Resources[0].GrpcAddresses
+ return nil
+ }
+ for _, b := range resp.Resources {
+ peers = append(peers, b.GrpcAddresses)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ t.Broker = PickMember(peers, []byte(targetTopicPartition))
+
+ return t, nil
+
+}
+
+func (broker *MessageBroker) checkFilers() {
+
+ // contact a filer about masters
+ var masters []string
+ found := false
+ for !found {
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received master list: %s", masters)
+
+ // contact each masters for filers
+ var filers []string
+ found = false
+ for !found {
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to list filers: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received filer list: %s", filers)
+
+ broker.option.Filers = filers
+
+}
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index b3a909a6c..dc11061af 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -1,11 +1,15 @@
package broker
import (
+ "crypto/md5"
+ "fmt"
"io"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -44,27 +48,24 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
Topic: in.Init.Topic,
Partition: in.Init.Partition,
}
- tl := broker.topicLocks.RequestLock(tp, topicConfig, true)
- defer broker.topicLocks.ReleaseLock(tp, true)
-
- updatesChan := make(chan int32)
-
- go func() {
- for update := range updatesChan {
- if err := stream.Send(&messaging_pb.PublishResponse{
- Config: &messaging_pb.PublishResponse_ConfigMessage{
- PartitionCount: update,
- },
- }); err != nil {
- glog.V(0).Infof("err sending publish response: %v", err)
- return
- }
- }
- }()
+ tpDir := fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, tp.Namespace, tp.Topic)
+ md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
+ // println("chan data stored under", tpDir, "as", md5File)
+
+ if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
+ return fmt.Errorf("channel is already closed")
+ }
+
+ tl := broker.topicManager.RequestLock(tp, topicConfig, true)
+ defer broker.topicManager.ReleaseLock(tp, true)
+
+ md5hash := md5.New()
// process each message
for {
+ // println("recv")
in, err := stream.Recv()
+ // glog.V(0).Infof("recieved %v err: %v", in, err)
if err == io.EOF {
return nil
}
@@ -86,5 +87,26 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
tl.logBuffer.AddToBuffer(in.Data.Key, data)
+ if in.Data.IsClose {
+ // println("server received closing")
+ break
+ }
+
+ md5hash.Write(in.Data.Value)
+
+ }
+
+ if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
+ glog.V(0).Infof("err writing %s: %v", md5File, err)
}
+
+ // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
+
+ // send the close ack
+ // println("server send ack closing")
+ if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
+ glog.V(0).Infof("err sending close response: %v", err)
+ }
+ return nil
+
}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 379063eed..eb6946e81 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -37,9 +37,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// IsTransient: true,
}
- if err = stream.Send(&messaging_pb.BrokerMessage{
- Redirect: nil,
- }); err != nil {
+ if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
return err
}
@@ -49,8 +47,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
Topic: in.Init.Topic,
Partition: in.Init.Partition,
}
- lock := broker.topicLocks.RequestLock(tp, topicConfig, false)
- defer broker.topicLocks.ReleaseLock(tp, false)
+ lock := broker.topicManager.RequestLock(tp, topicConfig, false)
+ subscription := lock.subscriptions.AddSubscription(subscriberId)
+ defer broker.topicManager.ReleaseLock(tp, false)
lastReadTime := time.Now()
switch in.Init.StartPosition {
@@ -58,6 +57,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
lastReadTime = time.Unix(0, in.Init.TimestampNs)
case messaging_pb.SubscriberMessage_InitMessage_LATEST:
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
+ lastReadTime = time.Unix(0, 0)
}
var processedTsNs int64
@@ -84,11 +84,17 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
}
+ if m.IsClose {
+ // println("processed EOF")
+ return io.EOF
+ }
processedTsNs = logEntry.TsNs
+ messageCount++
return nil
}
if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
+ // println("stopping from persisted logs")
return err
}
@@ -96,10 +102,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
lastReadTime = time.Unix(0, processedTsNs)
}
- messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
- lock.Mutex.Lock()
- lock.cond.Wait()
- lock.Mutex.Unlock()
+ err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ subscription.Wait()
return true
}, eachLogEntryFn)
@@ -114,7 +118,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
- topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic)
+ topicDir := genTopicDir(tp.Namespace, tp.Topic)
partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
@@ -125,7 +129,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
return nil
}
}
- if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){
+ if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
return nil
}
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
@@ -134,7 +138,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
- return nil
+ return err
}
return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
}
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 29c227274..0c04d2841 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -16,6 +16,7 @@ type MessageBrokerOption struct {
Filers []string
DefaultReplication string
MaxMB int
+ Ip string
Port int
Cipher bool
}
@@ -23,7 +24,7 @@ type MessageBrokerOption struct {
type MessageBroker struct {
option *MessageBrokerOption
grpcDialOption grpc.DialOption
- topicLocks *TopicLocks
+ topicManager *TopicManager
}
func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
@@ -33,77 +34,66 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
grpcDialOption: grpcDialOption,
}
- messageBroker.topicLocks = NewTopicLocks(messageBroker)
+ messageBroker.topicManager = NewTopicManager(messageBroker)
- messageBroker.checkPeers()
+ messageBroker.checkFilers()
- // go messageBroker.loopForEver()
+ go messageBroker.keepConnectedToOneFiler()
return messageBroker, nil
}
-func (broker *MessageBroker) loopForEver() {
+func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
- broker.checkPeers()
- time.Sleep(3 * time.Second)
- }
-
-}
-
-func (broker *MessageBroker) checkPeers() {
-
- // contact a filer about masters
- var masters []string
- found := false
- for !found {
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.KeepConnected(context.Background())
if err != nil {
+ glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
- masters = append(masters, resp.Masters...)
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received master list: %s", masters)
-
- // contact each masters for filers
- var filers []string
- found = false
- for !found {
- for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
- ClientType: "filer",
- })
- if err != nil {
+
+ initRequest := &filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }
+ for _, tp := range broker.topicManager.ListTopicPartitions() {
+ initRequest.Resources = append(initRequest.Resources, tp.String())
+ }
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
- filers = append(filers, resp.GrpcAddresses...)
-
+ // TODO send events of adding/removing topics
+
+ glog.V(0).Infof("conntected with filer: %v", filer)
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("send heartbeat")
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("received reply")
+ time.Sleep(11 * time.Second)
+ // println("woke up")
+ }
return nil
})
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to list filers: %v", err)
- time.Sleep(time.Second)
+ time.Sleep(3 * time.Second)
}
}
- glog.V(0).Infof("received filer list: %s", filers)
-
- broker.option.Filers = filers
}
diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go
new file mode 100644
index 000000000..465a2a8f2
--- /dev/null
+++ b/weed/messaging/broker/consistent_distribution.go
@@ -0,0 +1,38 @@
+package broker
+
+import (
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+)
+
+type Member string
+
+func (m Member) String() string {
+ return string(m)
+}
+
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+func PickMember(members []string, key []byte) string {
+ cfg := consistent.Config{
+ PartitionCount: 9791,
+ ReplicationFactor: 2,
+ Load: 1.25,
+ Hasher: hasher{},
+ }
+
+ cmembers := []consistent.Member{}
+ for _, m := range members {
+ cmembers = append(cmembers, Member(m))
+ }
+
+ c := consistent.New(cmembers, cfg)
+
+ m := c.LocateKey(key)
+
+ return m.String()
+}
diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go
new file mode 100644
index 000000000..f58fe4e0e
--- /dev/null
+++ b/weed/messaging/broker/consistent_distribution_test.go
@@ -0,0 +1,32 @@
+package broker
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestPickMember(t *testing.T) {
+
+ servers := []string{
+ "s1:port",
+ "s2:port",
+ "s3:port",
+ "s5:port",
+ "s4:port",
+ }
+
+ total := 1000
+
+ distribution := make(map[string]int)
+ for i := 0; i < total; i++ {
+ tp := fmt.Sprintf("tp:%2d", i)
+ m := PickMember(servers, []byte(tp))
+ // println(tp, "=>", m)
+ distribution[m]++
+ }
+
+ for member, count := range distribution {
+ fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
+ }
+
+}
diff --git a/weed/messaging/broker/subscription.go b/weed/messaging/broker/subscription.go
new file mode 100644
index 000000000..f74b0c546
--- /dev/null
+++ b/weed/messaging/broker/subscription.go
@@ -0,0 +1,82 @@
+package broker
+
+import (
+ "sync"
+)
+
+type TopicPartitionSubscription struct {
+ sync.Mutex
+ name string
+ lastReadTsNs int64
+ cond *sync.Cond
+}
+
+func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription {
+ t := &TopicPartitionSubscription{
+ name: name,
+ }
+ t.cond = sync.NewCond(t)
+ return t
+}
+
+func (s *TopicPartitionSubscription) Wait() {
+ s.Mutex.Lock()
+ s.cond.Wait()
+ s.Mutex.Unlock()
+}
+
+func (s *TopicPartitionSubscription) NotifyOne() {
+ // notify one waiting goroutine
+ s.cond.Signal()
+}
+
+type TopicPartitionSubscriptions struct {
+ sync.Mutex
+ cond *sync.Cond
+ subscriptions map[string]*TopicPartitionSubscription
+ subscriptionsLock sync.RWMutex
+}
+
+func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions {
+ m := &TopicPartitionSubscriptions{
+ subscriptions: make(map[string]*TopicPartitionSubscription),
+ }
+ m.cond = sync.NewCond(m)
+ return m
+}
+
+func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription {
+ m.subscriptionsLock.Lock()
+ defer m.subscriptionsLock.Unlock()
+
+ if s, found := m.subscriptions[subscription]; found {
+ return s
+ }
+
+ s := NewTopicPartitionSubscription(subscription)
+ m.subscriptions[subscription] = s
+
+ return s
+
+}
+
+func (m *TopicPartitionSubscriptions) NotifyAll() {
+
+ m.subscriptionsLock.RLock()
+ defer m.subscriptionsLock.RUnlock()
+
+ for name, tps := range m.subscriptions {
+ println("notifying", name)
+ tps.NotifyOne()
+ }
+
+}
+
+func (m *TopicPartitionSubscriptions) Wait() {
+ m.Mutex.Lock()
+ m.cond.Wait()
+ for _, tps := range m.subscriptions {
+ tps.NotifyOne()
+ }
+ m.Mutex.Unlock()
+}
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go
deleted file mode 100644
index f8a5aa171..000000000
--- a/weed/messaging/broker/topic_lock.go
+++ /dev/null
@@ -1,103 +0,0 @@
-package broker
-
-import (
- "fmt"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
-)
-
-type TopicPartition struct {
- Namespace string
- Topic string
- Partition int32
-}
-type TopicLock struct {
- sync.Mutex
- cond *sync.Cond
- subscriberCount int
- publisherCount int
- logBuffer *log_buffer.LogBuffer
-}
-
-type TopicLocks struct {
- sync.Mutex
- locks map[TopicPartition]*TopicLock
- broker *MessageBroker
-}
-
-func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks {
- return &TopicLocks{
- locks: make(map[TopicPartition]*TopicLock),
- broker: messageBroker,
- }
-}
-
-func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
-
- flushFn := func(startTime, stopTime time.Time, buf []byte) {
-
- if topicConfig.IsTransient {
- // return
- }
-
- // fmt.Printf("flushing with topic config %+v\n", topicConfig)
-
- targetFile := fmt.Sprintf(
- "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
- filer2.TopicsDir, tp.Namespace, tp.Topic,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- tp.Partition,
- )
-
- if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
- }
- logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
- tl.cond.Broadcast()
- })
-
- return logBuffer
-}
-
-func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock {
- tl.Lock()
- defer tl.Unlock()
-
- lock, found := tl.locks[partition]
- if !found {
- lock = &TopicLock{}
- lock.cond = sync.NewCond(&lock.Mutex)
- tl.locks[partition] = lock
- lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig)
- }
- if isPublisher {
- lock.publisherCount++
- } else {
- lock.subscriberCount++
- }
- return lock
-}
-
-func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
- tl.Lock()
- defer tl.Unlock()
-
- lock, found := tl.locks[partition]
- if !found {
- return
- }
- if isPublisher {
- lock.publisherCount--
- } else {
- lock.subscriberCount--
- }
- if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
- delete(tl.locks, partition)
- }
-}
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
new file mode 100644
index 000000000..e9f8903e8
--- /dev/null
+++ b/weed/messaging/broker/topic_manager.go
@@ -0,0 +1,124 @@
+package broker
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+type TopicPartition struct {
+ Namespace string
+ Topic string
+ Partition int32
+}
+
+const (
+ TopicPartitionFmt = "%s/%s_%02d"
+)
+
+func (tp *TopicPartition) String() string {
+ return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
+}
+
+type TopicControl struct {
+ sync.Mutex
+ cond *sync.Cond
+ subscriberCount int
+ publisherCount int
+ logBuffer *log_buffer.LogBuffer
+ subscriptions *TopicPartitionSubscriptions
+}
+
+type TopicManager struct {
+ sync.Mutex
+ topicCursors map[TopicPartition]*TopicControl
+ broker *MessageBroker
+}
+
+func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
+ return &TopicManager{
+ topicCursors: make(map[TopicPartition]*TopicControl),
+ broker: messageBroker,
+ }
+}
+
+func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
+
+ flushFn := func(startTime, stopTime time.Time, buf []byte) {
+
+ if topicConfig.IsTransient {
+ // return
+ }
+
+ // fmt.Printf("flushing with topic config %+v\n", topicConfig)
+
+ targetFile := fmt.Sprintf(
+ "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
+ filer2.TopicsDir, tp.Namespace, tp.Topic,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ tp.Partition,
+ )
+
+ if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
+ }
+ logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
+ tc.subscriptions.NotifyAll()
+ })
+
+ return logBuffer
+}
+
+func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
+ tm.Lock()
+ defer tm.Unlock()
+
+ tc, found := tm.topicCursors[partition]
+ if !found {
+ tc = &TopicControl{}
+ tm.topicCursors[partition] = tc
+ tc.subscriptions = NewTopicPartitionSubscriptions()
+ tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
+ }
+ if isPublisher {
+ tc.publisherCount++
+ } else {
+ tc.subscriberCount++
+ }
+ return tc
+}
+
+func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
+ tm.Lock()
+ defer tm.Unlock()
+
+ lock, found := tm.topicCursors[partition]
+ if !found {
+ return
+ }
+ if isPublisher {
+ lock.publisherCount--
+ } else {
+ lock.subscriberCount--
+ }
+ if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
+ delete(tm.topicCursors, partition)
+ lock.logBuffer.Shutdown()
+ }
+}
+
+func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
+ tm.Lock()
+ defer tm.Unlock()
+
+ for k := range tm.topicCursors {
+ tps = append(tps, k)
+ }
+ return
+}