diff options
Diffstat (limited to 'weed/mq/kafka/protocol/offset_management.go')
| -rw-r--r-- | weed/mq/kafka/protocol/offset_management.go | 89 |
1 files changed, 62 insertions, 27 deletions
diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 0a6e724fb..72ad13267 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) @@ -114,11 +115,10 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } - // Get consumer group - group := h.groupCoordinator.GetGroup(req.GroupID) - if group == nil { - return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil - } + // Get or create consumer group + // Some Kafka clients (like kafka-go Reader) commit offsets without formally joining + // the group via JoinGroup/SyncGroup. We need to support these "simple consumer" use cases. + group := h.groupCoordinator.GetOrCreateGroup(req.GroupID) group.Mu.Lock() defer group.Mu.Unlock() @@ -126,8 +126,14 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re // Update group's last activity group.LastActivity = time.Now() - // Require matching generation to store commits; return IllegalGeneration otherwise - generationMatches := (req.GenerationID == group.Generation) + // Check generation compatibility + // Allow commits for empty groups (no active members) to support simple consumers + // that commit offsets without formal group membership + groupIsEmpty := len(group.Members) == 0 + generationMatches := groupIsEmpty || (req.GenerationID == group.Generation) + + glog.V(3).Infof("[OFFSET_COMMIT] Group check: id=%s reqGen=%d groupGen=%d members=%d empty=%v matches=%v", + req.GroupID, req.GenerationID, group.Generation, len(group.Members), groupIsEmpty, generationMatches) // Process offset commits resp := OffsetCommitResponse{ @@ -143,7 +149,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re for _, p := range t.Partitions { - // Create consumer offset key for SMQ storage + // Create consumer offset key for SMQ storage (not used immediately) key := ConsumerOffsetKey{ Topic: t.Name, Partition: p.Index, @@ -151,16 +157,33 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re ConsumerGroupInstance: req.GroupInstanceID, } - // Commit offset using SMQ storage (persistent to filer) + // Commit offset synchronously for immediate consistency var errCode int16 = ErrorCodeNone if generationMatches { - if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { + // Store in in-memory map for immediate response + // This is the primary committed offset position for consumers + if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { errCode = ErrorCodeOffsetMetadataTooLarge + glog.V(2).Infof("[OFFSET_COMMIT] Failed to commit offset: group=%s topic=%s partition=%d offset=%d err=%v", + req.GroupID, t.Name, p.Index, p.Offset, err) } else { + // Also persist to SMQ storage for durability across broker restarts + // This is done synchronously to ensure offset is not lost + if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { + // Log the error but don't fail the commit + // In-memory commit is the source of truth for active consumers + // SMQ persistence is best-effort for crash recovery + glog.V(3).Infof("[OFFSET_COMMIT] SMQ persist failed (non-fatal): group=%s topic=%s partition=%d offset=%d err=%v", + req.GroupID, t.Name, p.Index, p.Offset, err) + } + glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", + req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else { // Do not store commit if generation mismatch errCode = 22 // IllegalGeneration + glog.V(2).Infof("[OFFSET_COMMIT] Rejected - generation mismatch: group=%s expected=%d got=%d members=%d", + req.GroupID, group.Generation, req.GenerationID, len(group.Members)) } topicResp.Partitions = append(topicResp.Partitions, OffsetCommitPartitionResponse{ @@ -187,15 +210,17 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - // Get consumer group - group := h.groupCoordinator.GetGroup(request.GroupID) - if group == nil { - return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil - } + // Get or create consumer group + // IMPORTANT: Use GetOrCreateGroup (not GetGroup) to allow fetching persisted offsets + // even if the group doesn't exist in memory yet. This is critical for consumer restarts. + // Kafka allows offset fetches for groups that haven't joined yet (e.g., simple consumers). + group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) group.Mu.RLock() defer group.Mu.RUnlock() + glog.V(4).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) + // Build response response := OffsetFetchResponse{ CorrelationID: correlationID, @@ -222,25 +247,35 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req // Fetch offsets for requested partitions for _, partition := range partitionsToFetch { - // Create consumer offset key for SMQ storage - key := ConsumerOffsetKey{ - Topic: topic.Name, - Partition: partition, - ConsumerGroup: request.GroupID, - ConsumerGroupInstance: request.GroupInstanceID, - } - var fetchedOffset int64 = -1 var metadata string = "" var errorCode int16 = ErrorCodeNone - // Fetch offset directly from SMQ storage (persistent storage) - // No cache needed - offset fetching is infrequent compared to commits - if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { + // Try fetching from in-memory cache first (works for both mock and SMQ backends) + if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { fetchedOffset = off metadata = meta + glog.V(4).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", + request.GroupID, topic.Name, partition, off) } else { - // No offset found in persistent storage (-1 indicates no committed offset) + // Fallback: try fetching from SMQ persistent storage + // This handles cases where offsets are stored in SMQ but not yet loaded into memory + key := ConsumerOffsetKey{ + Topic: topic.Name, + Partition: partition, + ConsumerGroup: request.GroupID, + ConsumerGroupInstance: request.GroupInstanceID, + } + if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { + fetchedOffset = off + metadata = meta + glog.V(3).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", + request.GroupID, topic.Name, partition, off) + } else { + glog.V(3).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", + request.GroupID, topic.Name, partition) + } + // No offset found in either location (-1 indicates no committed offset) } partitionResponse := OffsetFetchPartitionResponse{ |
