diff options
Diffstat (limited to 'weed/messaging/broker')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 290c84e34..c5e033420 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -3,10 +3,12 @@ package broker import ( "fmt" "io" + "strings" "time" "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" @@ -57,6 +59,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs case messaging_pb.SubscriberMessage_InitMessage_LATEST: case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: } + var processedTsNs int64 // how to process each message // an error returned will end the subscription @@ -81,9 +84,18 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) return err } + processedTsNs = logEntry.TsNs return nil } + if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { + return err + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { lock.Mutex.Lock() lock.cond.Wait() @@ -94,3 +106,36 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } + +func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { + startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) + startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + + sizeBuf := make([]byte, 4) + startTsNs := startTime.UnixNano() + + topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic) + + return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { + dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) + return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { + if dayEntry.Name == startDate { + if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 { + return nil + } + } + // println("processing", hourMinuteEntry.FullPath) + chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) + defer chunkedFileReader.Close() + if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + chunkedFileReader.Close() + if err == io.EOF { + return nil + } + return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) + } + return nil + }, "", false, 24*60) + }, startDate, true, 366) + +} |
