aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_pub_follow.go
blob: d8f472249f0e0b58c2bfa37c207095feac0d4664 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package broker

import (
	"fmt"
	"io"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
	"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)

type memBuffer struct {
	buf       []byte
	startTime time.Time
	stopTime  time.Time
}

func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
	var req *mq_pb.PublishFollowMeRequest
	req, err = stream.Recv()
	if err != nil {
		return err
	}
	initMessage := req.GetInit()
	if initMessage == nil {
		return fmt.Errorf("missing init message")
	}

	// create an in-memory queue of buffered messages
	inMemoryBuffers := buffered_queue.NewBufferedQueue[memBuffer](4)
	logBuffer := b.buildFollowerLogBuffer(inMemoryBuffers)

	lastFlushTsNs := time.Now().UnixNano()

	// follow each published messages
	for {
		// receive a message
		req, err = stream.Recv()
		if err != nil {
			if err == io.EOF {
				err = nil
				break
			}
			glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
			break
		}

		// Process the received message
		if dataMessage := req.GetData(); dataMessage != nil {

			// TODO: change this to DataMessage
			// log the message
			if addErr := logBuffer.AddToBuffer(dataMessage); addErr != nil {
				err = fmt.Errorf("failed to add message to log buffer: %w", addErr)
				glog.Errorf("Failed to add message to log buffer: %v", addErr)
				break
			}

			// send back the ack
			if err := stream.Send(&mq_pb.PublishFollowMeResponse{
				AckTsNs: dataMessage.TsNs,
			}); err != nil {
				glog.Errorf("Error sending response %v: %v", dataMessage, err)
			}
			// println("ack", string(dataMessage.Key), dataMessage.TsNs)
		} else if closeMessage := req.GetClose(); closeMessage != nil {
			glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
			break
		} else if flushMessage := req.GetFlush(); flushMessage != nil {
			glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)

			lastFlushTsNs = flushMessage.TsNs

			// drop already flushed messages
			for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() {
				if mem.stopTime.UnixNano() <= flushMessage.TsNs {
					inMemoryBuffers.Dequeue()
					// println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
				} else {
					break
				}
			}

		} else {
			glog.Errorf("unknown message: %v", req)
		}
	}

	t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)

	logBuffer.ShutdownLogBuffer()
	// wait until all messages are sent to inMemoryBuffers
	for !logBuffer.IsAllFlushed() {
		time.Sleep(113 * time.Millisecond)
	}

	partitionDir := topic.PartitionDir(t, p)

	// flush the remaining messages
	inMemoryBuffers.CloseInput()
	for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {
		if len(mem.buf) == 0 {
			continue
		}

		startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()

		if stopTime.UnixNano() <= lastFlushTsNs {
			glog.V(0).Infof("dropping remaining data at %v %v", t, p)
			continue
		}

		// TODO trim data earlier than lastFlushTsNs

		targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))

		for {
			if err := b.appendToFile(targetFile, mem.buf); err != nil {
				glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
				time.Sleep(737 * time.Millisecond)
			} else {
				break
			}
		}

		glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
	}

	glog.V(0).Infof("shut down follower for %v %v", t, p)

	return err
}

func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_queue.BufferedQueue[memBuffer]) *log_buffer.LogBuffer {
	lb := log_buffer.NewLogBuffer("follower",
		5*time.Second, func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
			if len(buf) == 0 {
				return
			}
			inMemoryBuffers.Enqueue(memBuffer{
				buf:       buf,
				startTime: startTime,
				stopTime:  stopTime,
			})
			glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
		}, nil, func() {
		})
	return lb
}