aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-07-05 15:43:06 -0700
committerChris Lu <chris.lu@gmail.com>2020-07-05 15:43:06 -0700
commit55e40b08fc5f796c8d315b100a9c29dcf3a092e6 (patch)
tree9a80958b0693a09e007fc22266669c68a7c55cb1 /weed/messaging
parent881e0fde2e82293c779dfcc8a339122dec8a8ef6 (diff)
downloadseaweedfs-55e40b08fc5f796c8d315b100a9c29dcf3a092e6.tar.xz
seaweedfs-55e40b08fc5f796c8d315b100a9c29dcf3a092e6.zip
refactoring
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go4
1 files changed, 2 insertions, 2 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 9538d3063..9a7d653b5 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -100,7 +100,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return nil
}
- if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
+ if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
if err != io.EOF {
// println("stopping from persisted logs", err.Error())
return err
@@ -148,7 +148,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
defer chunkedFileReader.Close()
- if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ if _, err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
return err