aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker')
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go45
1 files changed, 45 insertions, 0 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 290c84e34..c5e033420 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -3,10 +3,12 @@ package broker
import (
"fmt"
"io"
+ "strings"
"time"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
@@ -57,6 +59,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
case messaging_pb.SubscriberMessage_InitMessage_LATEST:
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
}
+ var processedTsNs int64
// how to process each message
// an error returned will end the subscription
@@ -81,9 +84,18 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
}
+ processedTsNs = logEntry.TsNs
return nil
}
+ if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
+ return err
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
@@ -94,3 +106,36 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
+
+func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
+ startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+
+ sizeBuf := make([]byte, 4)
+ startTsNs := startTime.UnixNano()
+
+ topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic)
+
+ return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
+ dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
+ return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
+ if dayEntry.Name == startDate {
+ if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
+ return nil
+ }
+ }
+ // println("processing", hourMinuteEntry.FullPath)
+ chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
+ defer chunkedFileReader.Close()
+ if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ chunkedFileReader.Close()
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
+ }
+ return nil
+ }, "", false, 24*60)
+ }, startDate, true, 366)
+
+}