diff options
Diffstat (limited to 'weed/messaging/broker/broker_grpc_server_subscribe.go')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 4a89937c1..df4052096 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -2,6 +2,7 @@ package broker import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "io" "strings" "time" @@ -113,12 +114,21 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime) - err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { - lock.Mutex.Lock() - lock.cond.Wait() - lock.Mutex.Unlock() - return isConnected - }, eachLogEntryFn) + for { + lastReadTime, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { + lock.Mutex.Lock() + lock.cond.Wait() + lock.Mutex.Unlock() + return isConnected + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } return err |
