diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-25 01:18:44 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-25 01:18:44 -0700 |
| commit | 2baed2e1e995ad331985a7b8c359e732b223ad3a (patch) | |
| tree | b891a9330cb355d55cd8e35424d1eb0101c4b47e /weed/messaging | |
| parent | a814f3f0a80ac511132bd3ac97356f333f128b1c (diff) | |
| download | seaweedfs-2baed2e1e995ad331985a7b8c359e732b223ad3a.tar.xz seaweedfs-2baed2e1e995ad331985a7b8c359e732b223ad3a.zip | |
avoid possible metadata subscription data loss
Previous implementation append filer logs into one file. So one file is not always sorted, which can lead to miss reading some entries, especially when different filers have different write throughput.
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index d21fb351f..f07a961db 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -2,6 +2,7 @@ package broker import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "io" "strings" @@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() @@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { if dayEntry.Name == startDate { - if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 { + hourMinute := util.FileNameBase(hourMinuteEntry.Name) + if strings.Compare(hourMinute, startHourMinute) < 0 { return nil } } |
