1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
package broker
import (
"fmt"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)
type memBuffer struct {
buf []byte
startTime time.Time
stopTime time.Time
}
func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
var req *mq_pb.PublishFollowMeRequest
req, err = stream.Recv()
if err != nil {
return err
}
initMessage := req.GetInit()
if initMessage == nil {
return fmt.Errorf("missing init message")
}
// create an in-memory queue of buffered messages
inMemoryBuffers := buffered_queue.NewBufferedQueue[memBuffer](4)
logBuffer := b.buildFollowerLogBuffer(inMemoryBuffers)
lastFlushTsNs := time.Now().UnixNano()
// follow each published messages
for {
// receive a message
req, err = stream.Recv()
if err != nil {
if err == io.EOF {
err = nil
break
}
glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
break
}
// Process the received message
if dataMessage := req.GetData(); dataMessage != nil {
// TODO: change this to DataMessage
// log the message
if addErr := logBuffer.AddToBuffer(dataMessage); addErr != nil {
err = fmt.Errorf("failed to add message to log buffer: %w", addErr)
glog.Errorf("Failed to add message to log buffer: %v", addErr)
break
}
// send back the ack
if err := stream.Send(&mq_pb.PublishFollowMeResponse{
AckTsNs: dataMessage.TsNs,
}); err != nil {
glog.Errorf("Error sending response %v: %v", dataMessage, err)
}
// println("ack", string(dataMessage.Key), dataMessage.TsNs)
} else if closeMessage := req.GetClose(); closeMessage != nil {
glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
break
} else if flushMessage := req.GetFlush(); flushMessage != nil {
glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
lastFlushTsNs = flushMessage.TsNs
// drop already flushed messages
for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() {
if mem.stopTime.UnixNano() <= flushMessage.TsNs {
inMemoryBuffers.Dequeue()
// println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
} else {
break
}
}
} else {
glog.Errorf("unknown message: %v", req)
}
}
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
logBuffer.ShutdownLogBuffer()
// wait until all messages are sent to inMemoryBuffers
for !logBuffer.IsAllFlushed() {
time.Sleep(113 * time.Millisecond)
}
partitionDir := topic.PartitionDir(t, p)
// flush the remaining messages
inMemoryBuffers.CloseInput()
for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {
if len(mem.buf) == 0 {
continue
}
startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()
if stopTime.UnixNano() <= lastFlushTsNs {
glog.V(0).Infof("dropping remaining data at %v %v", t, p)
continue
}
// TODO trim data earlier than lastFlushTsNs
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
for {
if err := b.appendToFile(targetFile, mem.buf); err != nil {
glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
break
}
}
glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
}
glog.V(0).Infof("shut down follower for %v %v", t, p)
return err
}
func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_queue.BufferedQueue[memBuffer]) *log_buffer.LogBuffer {
lb := log_buffer.NewLogBuffer("follower",
5*time.Second, func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
if len(buf) == 0 {
return
}
inMemoryBuffers.Enqueue(memBuffer{
buf: buf,
startTime: startTime,
stopTime: stopTime,
})
glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
}, nil, func() {
})
return lb
}
|