aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition_offset.go
blob: 9c8a2dac49cb9855f4850a743bf6c2001baa0ced (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package topic

import (
	"fmt"
	"sync/atomic"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

// OffsetAssignmentFunc is a function type for assigning offsets to messages
type OffsetAssignmentFunc func() (int64, error)

// PublishWithOffset publishes a message with offset assignment
// This method is used by the Kafka gateway integration for sequential offset assignment
func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOffsetFn OffsetAssignmentFunc) (int64, error) {
	// Assign offset for this message
	offset, err := assignOffsetFn()
	if err != nil {
		return 0, fmt.Errorf("failed to assign offset: %w", err)
	}

	// Add message to buffer with offset
	err = p.addToBufferWithOffset(message, offset)
	if err != nil {
		return 0, fmt.Errorf("failed to add message to buffer: %w", err)
	}

	// Track publish activity for idle cleanup (consistent with Publish method)
	p.UpdateActivity()

	// Send to follower if needed (same logic as original Publish)
	if p.publishFolloweMeStream != nil {
		if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
			Message: &mq_pb.PublishFollowMeRequest_Data{
				Data: message,
			},
		}); followErr != nil {
			return 0, fmt.Errorf("send to follower %s: %v", p.Follower, followErr)
		}
	} else {
		atomic.StoreInt64(&p.AckTsNs, message.TsNs)
	}

	return offset, nil
}

// addToBufferWithOffset adds a message to the log buffer with a pre-assigned offset
func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offset int64) error {
	// Ensure we have a timestamp
	processingTsNs := message.TsNs
	if processingTsNs == 0 {
		processingTsNs = time.Now().UnixNano()
	}

	// Build a LogEntry that preserves the assigned sequential offset
	logEntry := &filer_pb.LogEntry{
		TsNs:             processingTsNs,
		PartitionKeyHash: util.HashToInt32(message.Key),
		Data:             message.Value,
		Key:              message.Key,
		Offset:           offset,
	}

	// Add the entry to the buffer in a way that preserves offset on disk and in-memory
	if err := p.LogBuffer.AddLogEntryToBuffer(logEntry); err != nil {
		return fmt.Errorf("failed to add log entry to buffer: %w", err)
	}

	return nil
}

// GetOffsetInfo returns offset information for this partition
// Used for debugging and monitoring partition offset state
func (p *LocalPartition) GetOffsetInfo() map[string]interface{} {
	return map[string]interface{}{
		"partition_ring_size":   p.RingSize,
		"partition_range_start": p.RangeStart,
		"partition_range_stop":  p.RangeStop,
		"partition_unix_time":   p.UnixTimeNs,
		"buffer_name":           p.LogBuffer.GetName(),
		"buffer_offset":         p.LogBuffer.GetOffset(),
	}
}

// OffsetAwarePublisher wraps a LocalPartition with offset assignment capability
type OffsetAwarePublisher struct {
	partition      *LocalPartition
	assignOffsetFn OffsetAssignmentFunc
}

// NewOffsetAwarePublisher creates a new offset-aware publisher
func NewOffsetAwarePublisher(partition *LocalPartition, assignOffsetFn OffsetAssignmentFunc) *OffsetAwarePublisher {
	return &OffsetAwarePublisher{
		partition:      partition,
		assignOffsetFn: assignOffsetFn,
	}
}

// Publish publishes a message with automatic offset assignment
func (oap *OffsetAwarePublisher) Publish(message *mq_pb.DataMessage) error {
	_, err := oap.partition.PublishWithOffset(message, oap.assignOffsetFn)
	return err
}

// GetPartition returns the underlying partition
func (oap *OffsetAwarePublisher) GetPartition() *LocalPartition {
	return oap.partition
}