aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_grpc_server_subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_grpc_server_subscribe.go')
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go22
1 files changed, 16 insertions, 6 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 4a89937c1..df4052096 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -2,6 +2,7 @@ package broker
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"io"
"strings"
"time"
@@ -113,12 +114,21 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
- err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
- lock.Mutex.Lock()
- lock.cond.Wait()
- lock.Mutex.Unlock()
- return isConnected
- }, eachLogEntryFn)
+ for {
+ lastReadTime, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ lock.Mutex.Lock()
+ lock.cond.Wait()
+ lock.Mutex.Unlock()
+ return isConnected
+ }, eachLogEntryFn)
+ if err != nil {
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
return err