diff options
Diffstat (limited to 'weed/mq/kafka/protocol/offset_management.go')
| -rw-r--r-- | weed/mq/kafka/protocol/offset_management.go | 703 |
1 files changed, 703 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go new file mode 100644 index 000000000..0a6e724fb --- /dev/null +++ b/weed/mq/kafka/protocol/offset_management.go @@ -0,0 +1,703 @@ +package protocol + +import ( + "encoding/binary" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" +) + +// ConsumerOffsetKey uniquely identifies a consumer offset +type ConsumerOffsetKey struct { + ConsumerGroup string + Topic string + Partition int32 + ConsumerGroupInstance string // Optional - for static group membership +} + +// OffsetCommit API (key 8) - Commit consumer group offsets +// This API allows consumers to persist their current position in topic partitions + +// OffsetCommitRequest represents an OffsetCommit request from a Kafka client +type OffsetCommitRequest struct { + GroupID string + GenerationID int32 + MemberID string + GroupInstanceID string // Optional static membership ID + RetentionTime int64 // Offset retention time (-1 for broker default) + Topics []OffsetCommitTopic +} + +// OffsetCommitTopic represents topic-level offset commit data +type OffsetCommitTopic struct { + Name string + Partitions []OffsetCommitPartition +} + +// OffsetCommitPartition represents partition-level offset commit data +type OffsetCommitPartition struct { + Index int32 // Partition index + Offset int64 // Offset to commit + LeaderEpoch int32 // Leader epoch (-1 if not available) + Metadata string // Optional metadata +} + +// OffsetCommitResponse represents an OffsetCommit response to a Kafka client +type OffsetCommitResponse struct { + CorrelationID uint32 + Topics []OffsetCommitTopicResponse +} + +// OffsetCommitTopicResponse represents topic-level offset commit response +type OffsetCommitTopicResponse struct { + Name string + Partitions []OffsetCommitPartitionResponse +} + +// OffsetCommitPartitionResponse represents partition-level offset commit response +type OffsetCommitPartitionResponse struct { + Index int32 + ErrorCode int16 +} + +// OffsetFetch API (key 9) - Fetch consumer group committed offsets +// This API allows consumers to retrieve their last committed positions + +// OffsetFetchRequest represents an OffsetFetch request from a Kafka client +type OffsetFetchRequest struct { + GroupID string + GroupInstanceID string // Optional static membership ID + Topics []OffsetFetchTopic + RequireStable bool // Only fetch stable offsets +} + +// OffsetFetchTopic represents topic-level offset fetch data +type OffsetFetchTopic struct { + Name string + Partitions []int32 // Partition indices to fetch (empty = all partitions) +} + +// OffsetFetchResponse represents an OffsetFetch response to a Kafka client +type OffsetFetchResponse struct { + CorrelationID uint32 + Topics []OffsetFetchTopicResponse + ErrorCode int16 // Group-level error +} + +// OffsetFetchTopicResponse represents topic-level offset fetch response +type OffsetFetchTopicResponse struct { + Name string + Partitions []OffsetFetchPartitionResponse +} + +// OffsetFetchPartitionResponse represents partition-level offset fetch response +type OffsetFetchPartitionResponse struct { + Index int32 + Offset int64 // Committed offset (-1 if no offset) + LeaderEpoch int32 // Leader epoch (-1 if not available) + Metadata string // Optional metadata + ErrorCode int16 // Partition-level error +} + +// Error codes specific to offset management are imported from errors.go + +func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // Parse OffsetCommit request + req, err := h.parseOffsetCommitRequest(requestBody, apiVersion) + if err != nil { + return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidCommitOffsetSize, apiVersion), nil + } + + // Validate request + if req.GroupID == "" || req.MemberID == "" { + return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil + } + + // Get consumer group + group := h.groupCoordinator.GetGroup(req.GroupID) + if group == nil { + return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil + } + + group.Mu.Lock() + defer group.Mu.Unlock() + + // Update group's last activity + group.LastActivity = time.Now() + + // Require matching generation to store commits; return IllegalGeneration otherwise + generationMatches := (req.GenerationID == group.Generation) + + // Process offset commits + resp := OffsetCommitResponse{ + CorrelationID: correlationID, + Topics: make([]OffsetCommitTopicResponse, 0, len(req.Topics)), + } + + for _, t := range req.Topics { + topicResp := OffsetCommitTopicResponse{ + Name: t.Name, + Partitions: make([]OffsetCommitPartitionResponse, 0, len(t.Partitions)), + } + + for _, p := range t.Partitions { + + // Create consumer offset key for SMQ storage + key := ConsumerOffsetKey{ + Topic: t.Name, + Partition: p.Index, + ConsumerGroup: req.GroupID, + ConsumerGroupInstance: req.GroupInstanceID, + } + + // Commit offset using SMQ storage (persistent to filer) + var errCode int16 = ErrorCodeNone + if generationMatches { + if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { + errCode = ErrorCodeOffsetMetadataTooLarge + } else { + } + } else { + // Do not store commit if generation mismatch + errCode = 22 // IllegalGeneration + } + + topicResp.Partitions = append(topicResp.Partitions, OffsetCommitPartitionResponse{ + Index: p.Index, + ErrorCode: errCode, + }) + } + + resp.Topics = append(resp.Topics, topicResp) + } + + return h.buildOffsetCommitResponse(resp, apiVersion), nil +} + +func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // Parse OffsetFetch request + request, err := h.parseOffsetFetchRequest(requestBody) + if err != nil { + return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + // Validate request + if request.GroupID == "" { + return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + // Get consumer group + group := h.groupCoordinator.GetGroup(request.GroupID) + if group == nil { + return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + group.Mu.RLock() + defer group.Mu.RUnlock() + + // Build response + response := OffsetFetchResponse{ + CorrelationID: correlationID, + Topics: make([]OffsetFetchTopicResponse, 0, len(request.Topics)), + ErrorCode: ErrorCodeNone, + } + + for _, topic := range request.Topics { + topicResponse := OffsetFetchTopicResponse{ + Name: topic.Name, + Partitions: make([]OffsetFetchPartitionResponse, 0), + } + + // If no partitions specified, fetch all partitions for the topic + partitionsToFetch := topic.Partitions + if len(partitionsToFetch) == 0 { + // Get all partitions for this topic from group's offset commits + if topicOffsets, exists := group.OffsetCommits[topic.Name]; exists { + for partition := range topicOffsets { + partitionsToFetch = append(partitionsToFetch, partition) + } + } + } + + // Fetch offsets for requested partitions + for _, partition := range partitionsToFetch { + // Create consumer offset key for SMQ storage + key := ConsumerOffsetKey{ + Topic: topic.Name, + Partition: partition, + ConsumerGroup: request.GroupID, + ConsumerGroupInstance: request.GroupInstanceID, + } + + var fetchedOffset int64 = -1 + var metadata string = "" + var errorCode int16 = ErrorCodeNone + + // Fetch offset directly from SMQ storage (persistent storage) + // No cache needed - offset fetching is infrequent compared to commits + if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { + fetchedOffset = off + metadata = meta + } else { + // No offset found in persistent storage (-1 indicates no committed offset) + } + + partitionResponse := OffsetFetchPartitionResponse{ + Index: partition, + Offset: fetchedOffset, + LeaderEpoch: 0, // Default epoch for SeaweedMQ (single leader model) + Metadata: metadata, + ErrorCode: errorCode, + } + topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) + } + + response.Topics = append(response.Topics, topicResponse) + } + + return h.buildOffsetFetchResponse(response, apiVersion), nil +} + +func (h *Handler) parseOffsetCommitRequest(data []byte, apiVersion uint16) (*OffsetCommitRequest, error) { + if len(data) < 8 { + return nil, fmt.Errorf("request too short") + } + + offset := 0 + + // GroupID (string) + groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + if offset+groupIDLength > len(data) { + return nil, fmt.Errorf("invalid group ID length") + } + groupID := string(data[offset : offset+groupIDLength]) + offset += groupIDLength + + // Generation ID (4 bytes) + if offset+4 > len(data) { + return nil, fmt.Errorf("missing generation ID") + } + generationID := int32(binary.BigEndian.Uint32(data[offset:])) + offset += 4 + + // MemberID (string) + if offset+2 > len(data) { + return nil, fmt.Errorf("missing member ID length") + } + memberIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + if offset+memberIDLength > len(data) { + return nil, fmt.Errorf("invalid member ID length") + } + memberID := string(data[offset : offset+memberIDLength]) + offset += memberIDLength + + // RetentionTime (8 bytes) - exists in v0-v4, removed in v5+ + var retentionTime int64 = -1 + if apiVersion <= 4 { + if len(data) < offset+8 { + return nil, fmt.Errorf("missing retention time for v%d", apiVersion) + } + retentionTime = int64(binary.BigEndian.Uint64(data[offset : offset+8])) + offset += 8 + } + + // GroupInstanceID (nullable string) - ONLY in version 3+ + var groupInstanceID string + if apiVersion >= 3 { + if offset+2 > len(data) { + return nil, fmt.Errorf("missing group instance ID length") + } + groupInstanceIDLength := int(int16(binary.BigEndian.Uint16(data[offset:]))) + offset += 2 + if groupInstanceIDLength == -1 { + // Null string + groupInstanceID = "" + } else if groupInstanceIDLength > 0 { + if offset+groupInstanceIDLength > len(data) { + return nil, fmt.Errorf("invalid group instance ID length") + } + groupInstanceID = string(data[offset : offset+groupInstanceIDLength]) + offset += groupInstanceIDLength + } + } + + // Topics array + var topicsCount uint32 + if len(data) >= offset+4 { + topicsCount = binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + } + + topics := make([]OffsetCommitTopic, 0, topicsCount) + + for i := uint32(0); i < topicsCount && offset < len(data); i++ { + // Parse topic name + if len(data) < offset+2 { + break + } + topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) + offset += 2 + + if len(data) < offset+int(topicNameLength) { + break + } + topicName := string(data[offset : offset+int(topicNameLength)]) + offset += int(topicNameLength) + + // Parse partitions array + if len(data) < offset+4 { + break + } + partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + + partitions := make([]OffsetCommitPartition, 0, partitionsCount) + + for j := uint32(0); j < partitionsCount && offset < len(data); j++ { + // Parse partition index (4 bytes) + if len(data) < offset+4 { + break + } + partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4])) + offset += 4 + + // Parse committed offset (8 bytes) + if len(data) < offset+8 { + break + } + committedOffset := int64(binary.BigEndian.Uint64(data[offset : offset+8])) + offset += 8 + + // Parse leader epoch (4 bytes) - ONLY in version 6+ + var leaderEpoch int32 = -1 + if apiVersion >= 6 { + if len(data) < offset+4 { + break + } + leaderEpoch = int32(binary.BigEndian.Uint32(data[offset : offset+4])) + offset += 4 + } + + // Parse metadata (string) + var metadata string = "" + if len(data) >= offset+2 { + metadataLength := int16(binary.BigEndian.Uint16(data[offset : offset+2])) + offset += 2 + if metadataLength == -1 { + metadata = "" + } else if metadataLength >= 0 && len(data) >= offset+int(metadataLength) { + metadata = string(data[offset : offset+int(metadataLength)]) + offset += int(metadataLength) + } + } + + partitions = append(partitions, OffsetCommitPartition{ + Index: partitionIndex, + Offset: committedOffset, + LeaderEpoch: leaderEpoch, + Metadata: metadata, + }) + } + topics = append(topics, OffsetCommitTopic{ + Name: topicName, + Partitions: partitions, + }) + } + + return &OffsetCommitRequest{ + GroupID: groupID, + GenerationID: generationID, + MemberID: memberID, + GroupInstanceID: groupInstanceID, + RetentionTime: retentionTime, + Topics: topics, + }, nil +} + +func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, error) { + if len(data) < 4 { + return nil, fmt.Errorf("request too short") + } + + offset := 0 + + // GroupID (string) + groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + if offset+groupIDLength > len(data) { + return nil, fmt.Errorf("invalid group ID length") + } + groupID := string(data[offset : offset+groupIDLength]) + offset += groupIDLength + + // Parse Topics array - classic encoding (INT32 count) for v0-v5 + if len(data) < offset+4 { + return nil, fmt.Errorf("OffsetFetch request missing topics array") + } + topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + + topics := make([]OffsetFetchTopic, 0, topicsCount) + + for i := uint32(0); i < topicsCount && offset < len(data); i++ { + // Parse topic name (STRING: INT16 length + bytes) + if len(data) < offset+2 { + break + } + topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) + offset += 2 + + if len(data) < offset+int(topicNameLength) { + break + } + topicName := string(data[offset : offset+int(topicNameLength)]) + offset += int(topicNameLength) + + // Parse partitions array (ARRAY: INT32 count) + if len(data) < offset+4 { + break + } + partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + + partitions := make([]int32, 0, partitionsCount) + + // If partitionsCount is 0, it means "fetch all partitions" + if partitionsCount == 0 { + partitions = nil // nil means all partitions + } else { + for j := uint32(0); j < partitionsCount && offset < len(data); j++ { + // Parse partition index (4 bytes) + if len(data) < offset+4 { + break + } + partitionIndex := int32(binary.BigEndian.Uint32(data[offset : offset+4])) + offset += 4 + + partitions = append(partitions, partitionIndex) + } + } + + topics = append(topics, OffsetFetchTopic{ + Name: topicName, + Partitions: partitions, + }) + } + + // Parse RequireStable flag (1 byte) - for transactional consistency + var requireStable bool + if len(data) >= offset+1 { + requireStable = data[offset] != 0 + offset += 1 + } + + return &OffsetFetchRequest{ + GroupID: groupID, + Topics: topics, + RequireStable: requireStable, + }, nil +} + +func (h *Handler) commitOffset(group *consumer.ConsumerGroup, topic string, partition int32, offset int64, metadata string) error { + // Initialize topic offsets if needed + if group.OffsetCommits == nil { + group.OffsetCommits = make(map[string]map[int32]consumer.OffsetCommit) + } + + if group.OffsetCommits[topic] == nil { + group.OffsetCommits[topic] = make(map[int32]consumer.OffsetCommit) + } + + // Store the offset commit + group.OffsetCommits[topic][partition] = consumer.OffsetCommit{ + Offset: offset, + Metadata: metadata, + Timestamp: time.Now(), + } + + return nil +} + +func (h *Handler) fetchOffset(group *consumer.ConsumerGroup, topic string, partition int32) (int64, string, error) { + // Check if topic exists in offset commits + if group.OffsetCommits == nil { + return -1, "", nil // No committed offset + } + + topicOffsets, exists := group.OffsetCommits[topic] + if !exists { + return -1, "", nil // No committed offset for topic + } + + offsetCommit, exists := topicOffsets[partition] + if !exists { + return -1, "", nil // No committed offset for partition + } + + return offsetCommit.Offset, offsetCommit.Metadata, nil +} + +func (h *Handler) buildOffsetCommitResponse(response OffsetCommitResponse, apiVersion uint16) []byte { + estimatedSize := 16 + for _, topic := range response.Topics { + estimatedSize += len(topic.Name) + 8 + len(topic.Partitions)*8 + } + + result := make([]byte, 0, estimatedSize) + + // NOTE: Correlation ID is handled by writeResponseWithCorrelationID + // Do NOT include it in the response body + + // Throttle time (4 bytes) - ONLY for version 3+, and it goes at the BEGINNING + if apiVersion >= 3 { + result = append(result, 0, 0, 0, 0) // throttle_time_ms = 0 + } + + // Topics array length (4 bytes) + topicsLengthBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics))) + result = append(result, topicsLengthBytes...) + + // Topics + for _, topic := range response.Topics { + // Topic name length (2 bytes) + nameLength := make([]byte, 2) + binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name))) + result = append(result, nameLength...) + + // Topic name + result = append(result, []byte(topic.Name)...) + + // Partitions array length (4 bytes) + partitionsLength := make([]byte, 4) + binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions))) + result = append(result, partitionsLength...) + + // Partitions + for _, partition := range topic.Partitions { + // Partition index (4 bytes) + indexBytes := make([]byte, 4) + binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index)) + result = append(result, indexBytes...) + + // Error code (2 bytes) + errorBytes := make([]byte, 2) + binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode)) + result = append(result, errorBytes...) + } + } + + return result +} + +func (h *Handler) buildOffsetFetchResponse(response OffsetFetchResponse, apiVersion uint16) []byte { + estimatedSize := 32 + for _, topic := range response.Topics { + estimatedSize += len(topic.Name) + 16 + len(topic.Partitions)*32 + for _, partition := range topic.Partitions { + estimatedSize += len(partition.Metadata) + } + } + + result := make([]byte, 0, estimatedSize) + + // NOTE: Correlation ID is handled by writeResponseWithCorrelationID + // Do NOT include it in the response body + + // Throttle time (4 bytes) - for version 3+ this appears immediately after correlation ID + if apiVersion >= 3 { + result = append(result, 0, 0, 0, 0) // throttle_time_ms = 0 + } + + // Topics array length (4 bytes) + topicsLengthBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsLengthBytes, uint32(len(response.Topics))) + result = append(result, topicsLengthBytes...) + + // Topics + for _, topic := range response.Topics { + // Topic name length (2 bytes) + nameLength := make([]byte, 2) + binary.BigEndian.PutUint16(nameLength, uint16(len(topic.Name))) + result = append(result, nameLength...) + + // Topic name + result = append(result, []byte(topic.Name)...) + + // Partitions array length (4 bytes) + partitionsLength := make([]byte, 4) + binary.BigEndian.PutUint32(partitionsLength, uint32(len(topic.Partitions))) + result = append(result, partitionsLength...) + + // Partitions + for _, partition := range topic.Partitions { + // Partition index (4 bytes) + indexBytes := make([]byte, 4) + binary.BigEndian.PutUint32(indexBytes, uint32(partition.Index)) + result = append(result, indexBytes...) + + // Committed offset (8 bytes) + offsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(offsetBytes, uint64(partition.Offset)) + result = append(result, offsetBytes...) + + // Leader epoch (4 bytes) - only included in version 5+ + if apiVersion >= 5 { + epochBytes := make([]byte, 4) + binary.BigEndian.PutUint32(epochBytes, uint32(partition.LeaderEpoch)) + result = append(result, epochBytes...) + } + + // Metadata length (2 bytes) + metadataLength := make([]byte, 2) + binary.BigEndian.PutUint16(metadataLength, uint16(len(partition.Metadata))) + result = append(result, metadataLength...) + + // Metadata + result = append(result, []byte(partition.Metadata)...) + + // Error code (2 bytes) + errorBytes := make([]byte, 2) + binary.BigEndian.PutUint16(errorBytes, uint16(partition.ErrorCode)) + result = append(result, errorBytes...) + } + } + + // Group-level error code (2 bytes) - only included in version 2+ + if apiVersion >= 2 { + groupErrorBytes := make([]byte, 2) + binary.BigEndian.PutUint16(groupErrorBytes, uint16(response.ErrorCode)) + result = append(result, groupErrorBytes...) + } + + return result +} + +func (h *Handler) buildOffsetCommitErrorResponse(correlationID uint32, errorCode int16, apiVersion uint16) []byte { + response := OffsetCommitResponse{ + CorrelationID: correlationID, + Topics: []OffsetCommitTopicResponse{ + { + Name: "", + Partitions: []OffsetCommitPartitionResponse{ + {Index: 0, ErrorCode: errorCode}, + }, + }, + }, + } + + return h.buildOffsetCommitResponse(response, apiVersion) +} + +func (h *Handler) buildOffsetFetchErrorResponse(correlationID uint32, errorCode int16) []byte { + response := OffsetFetchResponse{ + CorrelationID: correlationID, + Topics: []OffsetFetchTopicResponse{}, + ErrorCode: errorCode, + } + + return h.buildOffsetFetchResponse(response, 0) +} |
