aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_grpc_server_subscribe.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-21 00:47:27 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-21 00:47:27 -0800
commita9e6db1a8e964f10571a5e11c197fd2da141c6f7 (patch)
tree0a22db3d6c68178d8e913c1bffd550ae4127a7e1 /weed/messaging/broker/broker_grpc_server_subscribe.go
parentf0455dee683e831487c345a575b7894c0e2bf61a (diff)
parent84f05787f8eecfcb61e49882346ad5855b6bb784 (diff)
downloadseaweedfs-origin/ftp.tar.xz
seaweedfs-origin/ftp.zip
Merge branch 'master' into ftporigin/ftp
Diffstat (limited to 'weed/messaging/broker/broker_grpc_server_subscribe.go')
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go26
1 files changed, 15 insertions, 11 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index df4052096..3021473e5 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -101,20 +101,21 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return nil
}
- if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
- if err != io.EOF {
- // println("stopping from persisted logs", err.Error())
- return err
- }
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
-
// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
for {
+
+ if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
+ if err != io.EOF {
+ // println("stopping from persisted logs", err.Error())
+ return err
+ }
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
lastReadTime, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
@@ -122,6 +123,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return isConnected
}, eachLogEntryFn)
if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {