diff options
Diffstat (limited to 'weed/messaging/broker/broker_grpc_server_subscribe.go')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index d21fb351f..f07a961db 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -2,6 +2,7 @@ package broker import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "io" "strings" @@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() @@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { if dayEntry.Name == startDate { - if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 { + hourMinute := util.FileNameBase(hourMinuteEntry.Name) + if strings.Compare(hourMinute, startHourMinute) < 0 { return nil } } |
