aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_grpc_server_subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_grpc_server_subscribe.go')
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go69
1 files changed, 31 insertions, 38 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index acf0330c6..c358eccf6 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -1,6 +1,7 @@
package broker
import (
+ "fmt"
"io"
"time"
@@ -9,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
@@ -23,12 +23,22 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
+ var messageCount int64
subscriberId := in.Init.SubscriberId
- println("+ subscriber:", subscriberId)
- defer println("- subscriber:", subscriberId)
+ fmt.Printf("+ subscriber %s\n", subscriberId)
+ defer func() {
+ fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount)
+ }()
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
+ IsTransient: true,
+ }
+
+ if err = stream.Send(&messaging_pb.BrokerMessage{
+ Redirect: nil,
+ }); err != nil {
+ return err
}
// get lock
@@ -52,7 +62,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// an error returned will end the subscription
eachMessageFn := func(m *messaging_pb.Message) error {
err := stream.Send(&messaging_pb.BrokerMessage{
- Data: m,
+ Data: m,
})
if err != nil {
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
@@ -60,42 +70,25 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
- // loop through all messages
- for {
-
- _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime)
-
- for pos := 0; pos+4 < len(buf); {
-
- size := util.BytesToUint32(buf[pos : pos+4])
- entryData := buf[pos+4 : pos+4+int(size)]
-
- logEntry := &filer_pb.LogEntry{}
- if err = proto.Unmarshal(entryData, logEntry); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
- pos += 4 + int(size)
- continue
- }
-
- m := &messaging_pb.Message{}
- if err = proto.Unmarshal(logEntry.Data, m); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
- pos += 4 + int(size)
- continue
- }
-
- // fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value))
- if err = eachMessageFn(m); err != nil {
- return err
- }
-
- lastReadTime = time.Unix(0, m.Timestamp)
- pos += 4 + int(size)
- }
-
+ messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
lock.Mutex.Unlock()
- }
+ return true
+ }, func(logEntry *filer_pb.LogEntry) error {
+ m := &messaging_pb.Message{}
+ if err = proto.Unmarshal(logEntry.Data, m); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ return err
+ }
+ // fmt.Printf("sending : %d bytes\n", len(m.Value))
+ if err = eachMessageFn(m); err != nil {
+ glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
+ return err
+ }
+ return nil
+ })
+
+ return err
}