diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-28 02:05:44 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-28 02:05:44 -0700 |
| commit | 8ed490164e117f7bbf5a1fd59c867b1c820dbae2 (patch) | |
| tree | f6cff661917719cbc612d8254b3ca80c36aadb36 | |
| parent | 66effaed9e99b48255e1a0bc2373d2fa645919b3 (diff) | |
| download | seaweedfs-8ed490164e117f7bbf5a1fd59c867b1c820dbae2.tar.xz seaweedfs-8ed490164e117f7bbf5a1fd59c867b1c820dbae2.zip | |
refactoring
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index c358eccf6..290c84e34 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -70,12 +70,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } - messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { - lock.Mutex.Lock() - lock.cond.Wait() - lock.Mutex.Unlock() - return true - }, func(logEntry *filer_pb.LogEntry) error { + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { m := &messaging_pb.Message{} if err = proto.Unmarshal(logEntry.Data, m); err != nil { glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) @@ -87,7 +82,14 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } return nil - }) + } + + messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { + lock.Mutex.Lock() + lock.cond.Wait() + lock.Mutex.Unlock() + return true + }, eachLogEntryFn) return err |
