aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-19 23:37:04 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-19 23:37:50 -0700
commitce3cb25cfbf30a06348386210f72cc51c3fbd13a (patch)
tree217836bc07221d97a180f7f2fba41dd51ddb3eae /weed/messaging
parentf37323222751c104723273293a0b15b209021f32 (diff)
downloadseaweedfs-ce3cb25cfbf30a06348386210f72cc51c3fbd13a.tar.xz
seaweedfs-ce3cb25cfbf30a06348386210f72cc51c3fbd13a.zip
working for in memory single log buffer
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_append.go4
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go12
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go69
-rw-r--r--weed/messaging/broker/topic_lock.go4
-rw-r--r--weed/messaging/client/publisher.go2
5 files changed, 38 insertions, 53 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index c1ef063fb..7194dfcfc 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -16,10 +16,6 @@ import (
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
- if topicConfig.IsTransient {
- return nil
- }
-
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
if err2 != nil {
return err2
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index 210127be3..985f708b5 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -2,7 +2,6 @@ package broker
import (
"io"
- "time"
"github.com/golang/protobuf/proto"
@@ -77,16 +76,9 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
continue
}
- m := &messaging_pb.Message{
- Timestamp: time.Now().UnixNano(),
- Key: in.Data.Key,
- Value: in.Data.Value,
- Headers: in.Data.Headers,
- }
-
- // fmt.Printf("received: %d : %s\n", len(m.Value), string(m.Value))
+ // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
- data, err := proto.Marshal(m)
+ data, err := proto.Marshal(in.Data)
if err != nil {
glog.Errorf("marshall error: %v\n", err)
continue
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index acf0330c6..c358eccf6 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -1,6 +1,7 @@
package broker
import (
+ "fmt"
"io"
"time"
@@ -9,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
@@ -23,12 +23,22 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
+ var messageCount int64
subscriberId := in.Init.SubscriberId
- println("+ subscriber:", subscriberId)
- defer println("- subscriber:", subscriberId)
+ fmt.Printf("+ subscriber %s\n", subscriberId)
+ defer func() {
+ fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount)
+ }()
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
+ IsTransient: true,
+ }
+
+ if err = stream.Send(&messaging_pb.BrokerMessage{
+ Redirect: nil,
+ }); err != nil {
+ return err
}
// get lock
@@ -52,7 +62,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// an error returned will end the subscription
eachMessageFn := func(m *messaging_pb.Message) error {
err := stream.Send(&messaging_pb.BrokerMessage{
- Data: m,
+ Data: m,
})
if err != nil {
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
@@ -60,42 +70,25 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
- // loop through all messages
- for {
-
- _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime)
-
- for pos := 0; pos+4 < len(buf); {
-
- size := util.BytesToUint32(buf[pos : pos+4])
- entryData := buf[pos+4 : pos+4+int(size)]
-
- logEntry := &filer_pb.LogEntry{}
- if err = proto.Unmarshal(entryData, logEntry); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
- pos += 4 + int(size)
- continue
- }
-
- m := &messaging_pb.Message{}
- if err = proto.Unmarshal(logEntry.Data, m); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
- pos += 4 + int(size)
- continue
- }
-
- // fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value))
- if err = eachMessageFn(m); err != nil {
- return err
- }
-
- lastReadTime = time.Unix(0, m.Timestamp)
- pos += 4 + int(size)
- }
-
+ messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
lock.Mutex.Unlock()
- }
+ return true
+ }, func(logEntry *filer_pb.LogEntry) error {
+ m := &messaging_pb.Message{}
+ if err = proto.Unmarshal(logEntry.Data, m); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ return err
+ }
+ // fmt.Printf("sending : %d bytes\n", len(m.Value))
+ if err = eachMessageFn(m); err != nil {
+ glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
+ return err
+ }
+ return nil
+ })
+
+ return err
}
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go
index 652ff0545..74861a147 100644
--- a/weed/messaging/broker/topic_lock.go
+++ b/weed/messaging/broker/topic_lock.go
@@ -41,6 +41,10 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC
flushFn := func(startTime, stopTime time.Time, buf []byte) {
+ if topicConfig.IsTransient {
+ return
+ }
+
targetFile := fmt.Sprintf(
"%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
filer2.TopicsDir, tp.Namespace, tp.Topic,
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go
index 238b67783..3480ff55d 100644
--- a/weed/messaging/client/publisher.go
+++ b/weed/messaging/client/publisher.go
@@ -61,7 +61,7 @@ func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, er
}, nil
}
-func (p *Publisher) Publish(m *messaging_pb.RawData) error {
+func (p *Publisher) Publish(m *messaging_pb.Message) error {
return p.publishClient.Send(&messaging_pb.PublishRequest{
Data: m,