aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_grpc_server_subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_grpc_server_subscribe.go')
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go6
1 files changed, 4 insertions, 2 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index d21fb351f..f07a961db 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -2,6 +2,7 @@ package broker
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"io"
"strings"
@@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
@@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
if dayEntry.Name == startDate {
- if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name)
+ if strings.Compare(hourMinute, startHourMinute) < 0 {
return nil
}
}