diff options
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 9538d3063..9a7d653b5 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -100,7 +100,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return nil } - if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { + if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { if err != io.EOF { // println("stopping from persisted logs", err.Error()) return err @@ -148,7 +148,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) defer chunkedFileReader.Close() - if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + if _, err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { return err |
