aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go16
1 files changed, 9 insertions, 7 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index c358eccf6..290c84e34 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -70,12 +70,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
- messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
- lock.Mutex.Lock()
- lock.cond.Wait()
- lock.Mutex.Unlock()
- return true
- }, func(logEntry *filer_pb.LogEntry) error {
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
m := &messaging_pb.Message{}
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
@@ -87,7 +82,14 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
return nil
- })
+ }
+
+ messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ lock.Mutex.Lock()
+ lock.cond.Wait()
+ lock.Mutex.Unlock()
+ return true
+ }, eachLogEntryFn)
return err